This commit is contained in:
David Read 2012-04-10 16:35:17 +01:00
commit 330c9b5391
36 changed files with 2233 additions and 588 deletions

View File

@ -96,8 +96,36 @@ The commands should be run with the pyenv activated and refer to your sites conf
paster --plugin=ckanext-harvest harvester sources --config=mysite.ini
Authorization Profiles
======================
Starting from CKAN 1.6.1, the harvester extension offers the hability to use
different authorization profiles. These can be defined in your ini file as::
ckan.harvest.auth.profile = <profile_name>
The two available profiles right now are:
* `default`: This is the default profile, the same one that this extension has
used historically. Basically, only sysadmins can manage anything related to
harvesting, including creating and editing harvest sources or running harvest
jobs.
* `publisher`: When using this profile, sysadmins can still perform any
harvesting related action, but in addition, users belonging to a publisher
(with role `admin`) can manage and run their own harvest sources and jobs.
Note that this requires CKAN core to also use the `publisher` authorization
profile, i.e you will also need to add::
ckan.auth.profile = publisher
To know more about the CKAN publisher auth profile, visit
http://wiki.ckan.org/Working_with_the_publisher_auth_profile
The CKAN harverster
==================
===================
The plugin includes a harvester for remote CKAN instances. To use it, you need
to add the `ckan_harvester` plugin to your options file::
@ -128,6 +156,7 @@ field. The currently supported configuration options are:
* {dataset_id}
* {harvest_source_id}
* {harvest_source_url} # Will be stripped of trailing forward slashes (/)
* {harvest_source_title} # Requires CKAN 1.6
* {harvest_job_id}
* {harvest_object_id}
@ -147,6 +176,11 @@ field. The currently supported configuration options are:
created from this harvesting source. Logged in users and visitors will be
only able to read them.
* force_all: By default, after the first harvesting, the harvester will gather
only the modified packages from the remote site since the last harvesting.
Setting this property to true will force the harvester to gather all remote
packages regardless of the modification date. Default is False.
Here is an example of a configuration object (the one that must be entered in
the configuration field)::
@ -269,7 +303,8 @@ following methods::
- performing any necessary action with the fetched object (e.g
create a CKAN package).
Note: if this stage creates or updates a package, a reference
to the package should be added to the HarvestObject.
to the package must be added to the HarvestObject.
Additionally, the HarvestObject must be flagged as current.
- creating the HarvestObject - Package relation (if necessary)
- creating and storing any suitable HarvestObjectErrors that may
occur.
@ -308,7 +343,152 @@ pending harvesting jobs::
paster --plugin=ckanext-harvest harvester run --config=mysite.ini
After packages have been imported, the search index will have to be updated
before the packages appear in search results::
Note: If you don't have the `synchronous_search` plugin loaded, you will need
to update the search index after the harvesting in order for the packages to
appear in search results::
paster --plugin=ckan search-index rebuild
Setting up the harvesters on a production server
================================================
The previous approach works fine during development or debugging, but it is
not recommended for production servers. There are several possible ways of
setting up the harvesters, which will depend on your particular infrastructure
and needs. The bottom line is that the gather and fetch process should be kept
running somehow and then the run command should be run periodically to start
any pending jobs.
The following approach is the one generally used on CKAN deployments, and it
will probably suit most of the users. It uses Supervisor_, a tool to monitor
processes, and a cron job to run the harvest jobs, and it assumes that you
have already installed and configured the harvesting extension (See
`Installation` if not).
Note: It is recommended to run the harvest process from a non-root user
(generally the one you are running CKAN with). Replace the user `okfn` in the
following steps with the one you are using.
1. Install Supervisor::
sudo apt-get install supervisor
You can check if it is running with this command::
ps aux | grep supervisord
You should see a line similar to this one::
root 9224 0.0 0.3 56420 12204 ? Ss 15:52 0:00 /usr/bin/python /usr/bin/supervisord
2. Supervisor needs to have programs added to its configuration, which will
describe the tasks that need to be monitored. This configuration files are
stored in `/etc/supervisor/conf.d`.
Create a file named `/etc/supervisor/conf.d/ckan_harvesting.conf`, and copy the following contents::
; ===============================
; ckan harvester
; ===============================
[program:ckan_gather_consumer]
command=/var/lib/ckan/std/pyenv/bin/paster --plugin=ckanext-harvest harvester gather_consumer --config=/etc/ckan/std/std.ini
; user that owns virtual environment.
user=okfn
numprocs=1
stdout_logfile=/var/log/ckan/std/gather_consumer.log
stderr_logfile=/var/log/ckan/std/gather_consumer.log
autostart=true
autorestart=true
startsecs=10
[program:ckan_fetch_consumer]
command=/var/lib/ckan/std/pyenv/bin/paster --plugin=ckanext-harvest harvester fetch_consumer --config=/etc/ckan/std/std.ini
; user that owns virtual environment.
user=okfn
numprocs=1
stdout_logfile=/var/log/ckan/std/fetch_consumer.log
stderr_logfile=/var/log/ckan/std/fetch_consumer.log
autostart=true
autorestart=true
startsecs=10
There are a number of things that you will need to replace with your
specific installation settings (the example above shows paths from a
ckan instance installed via Debian packages):
* command: The absolute path to the paster command located in the
python virtual environment and the absolute path to the config
ini file.
* user: The unix user you are running CKAN with
* stdout_logfile and stderr_logfile: All output coming from the
harvest consumers will be written to this file. Ensure that the
necessary permissions are setup.
The rest of the configuration options are pretty self explanatory. Refer
to the `Supervisor documentation <http://supervisord.org/configuration.html#program-x-section-settings>`_
to know more about these and other options available.
3. Start the supervisor tasks with the following commands::
sudo supervisorctl reread
sudo supervisorctl add ckan_gather_consumer
sudo supervisorctl add ckan_fetch_consumer
sudo supervisorctl start ckan_gather_consumer
sudo supervisorctl start ckan_fetch_consumer
To check that the processes are running, you can run::
sudo supervisorctl status
ckan_fetch_consumer RUNNING pid 6983, uptime 0:22:06
ckan_gather_consumer RUNNING pid 6968, uptime 0:22:45
Some problems you may encounter when starting the processes:
* `ckan_gather_consumer: ERROR (no such process)`
Double-check your supervisor configuration file and stop and restart the supervisor daemon::
sudo service supervisor start; sudo service supervisor stop
* `ckan_gather_consumer: ERROR (abnormal termination)`
Something prevented the command from running properly. Have a look at the log file that
you defined in the `stdout_logfile` section to see what happened. Common errors include:
* `socket.error: [Errno 111] Connection refused`
RabbitMQ is not running::
sudo service rabbitmq-server start
4. Once we have the two consumers running and monitored, we just need to create a cron job
that will run the `run` harvester command periodically. To do so, edit the cron table with
the following command (it may ask you to choose an editor)::
sudo crontab -e -u okfn
Note that we are running this command as the same user we configured the processes to be run with
(`okfn` in our example).
Paste this line into your crontab, again replacing the paths to paster and the ini file with yours::
# m h dom mon dow command
*/15 * * * * /var/lib/ckan/std/pyenv/bin/paster --plugin=ckanext-harvest harvester run --config=/etc/ckan/std/std.ini
This particular example will check for pending jobs every fifteen minutes.
You can of course modify this periodicity, this `Wikipedia page <http://en.wikipedia.org/wiki/Cron#CRON_expression>`_
has a good overview of the crontab syntax.
.. _Supervisor: http://supervisord.org

View File

@ -2,8 +2,10 @@ import sys
import re
from pprint import pprint
from ckan import model
from ckan.logic import get_action, ValidationError
from ckan.lib.cli import CkanCommand
from ckanext.harvest.lib import *
from ckanext.harvest.queue import get_gather_consumer, get_fetch_consumer
class Harvester(CkanCommand):
@ -62,6 +64,13 @@ class Harvester(CkanCommand):
def command(self):
self._load_config()
# We'll need a sysadmin user to perform most of the actions
# We will use the sysadmin site user (named as the site_id)
context = {'model':model,'session':model.Session,'ignore_auth':True}
self.admin_user = get_action('get_site_user')(context,{})
print ''
if len(self.args) == 0:
@ -97,6 +106,9 @@ class Harvester(CkanCommand):
self.import_stage()
elif cmd == 'job-all':
self.create_harvest_job_all()
elif cmd == 'harvesters-info':
harvesters_info = get_action('harvesters_info_show')()
pprint(harvesters_info)
else:
print 'Command %s not recognized' % cmd
@ -139,48 +151,50 @@ class Harvester(CkanCommand):
else:
publisher_id = u''
try:
source = create_harvest_source({
data_dict = {
'url':url,
'type':type,
'config':config,
'active':active,
'user_id':user_id,
'publisher_id':publisher_id})
'publisher_id':publisher_id}
context = {'model':model, 'session':model.Session, 'user': self.admin_user['name']}
source = get_action('harvest_source_create')(context,data_dict)
print 'Created new harvest source:'
self.print_harvest_source(source)
sources = get_harvest_sources()
sources = get_action('harvest_source_list')(context,{})
self.print_there_are('harvest source', sources)
# Create a Harvest Job for the new Source
create_harvest_job(source['id'])
# Create a harvest job for the new source
get_action('harvest_job_create')(context,{'source_id':source['id']})
print 'A new Harvest Job for this source has also been created'
except ValidationError,e:
print 'An error occurred:'
print str(e.error_dict)
raise e
def remove_harvest_source(self):
if len(self.args) >= 2:
source_id = unicode(self.args[1])
else:
print 'Please provide a source id'
sys.exit(1)
remove_harvest_source(source_id)
context = {'model': model, 'user': self.admin_user['name'], 'session':model.Session}
get_action('harvest_source_delete')(context,{'id':source_id})
print 'Removed harvest source: %s' % source_id
def list_harvest_sources(self):
if len(self.args) >= 2 and self.args[1] == 'all':
sources = get_harvest_sources()
data_dict = {}
what = 'harvest source'
else:
sources = get_harvest_sources(active=True)
data_dict = {'only_active':True}
what = 'active harvest source'
context = {'model': model,'session':model.Session, 'user': self.admin_user['name']}
sources = get_action('harvest_source_list')(context,data_dict)
self.print_harvest_sources(sources)
self.print_there_are(what=what, sequence=sources)
@ -191,24 +205,24 @@ class Harvester(CkanCommand):
print 'Please provide a source id'
sys.exit(1)
job = create_harvest_job(source_id)
context = {'model': model,'session':model.Session, 'user': self.admin_user['name']}
job = get_action('harvest_job_create')(context,{'source_id':source_id})
self.print_harvest_job(job)
status = u'New'
jobs = get_harvest_jobs(status=status)
self.print_there_are('harvest jobs', jobs, condition=status)
jobs = get_action('harvest_job_list')(context,{'status':u'New'})
self.print_there_are('harvest jobs', jobs, condition=u'New')
def list_harvest_jobs(self):
jobs = get_harvest_jobs()
context = {'model': model, 'user': self.admin_user['name'], 'session':model.Session}
jobs = get_action('harvest_job_list')(context,{})
self.print_harvest_jobs(jobs)
self.print_there_are(what='harvest job', sequence=jobs)
def run_harvester(self):
try:
jobs = run_harvest_jobs()
except:
pass
sys.exit(0)
context = {'model': model, 'user': self.admin_user['name'], 'session':model.Session}
jobs = get_action('harvest_jobs_run')(context,{})
#print 'Sent %s jobs to the gather queue' % len(jobs)
def import_stage(self):
@ -216,11 +230,15 @@ class Harvester(CkanCommand):
source_id = unicode(self.args[1])
else:
source_id = None
import_last_objects(source_id)
context = {'model': model, 'session':model.Session, 'user': self.admin_user['name']}
objs = get_action('harvest_objects_import')(context,{'source_id':source_id})
print '%s objects reimported' % len(objs)
def create_harvest_job_all(self):
jobs = create_harvest_job_all()
print "Created %s new harvest jobs" % len(jobs)
context = {'model': model, 'user': self.admin_user['name'], 'session':model.Session}
jobs = get_action('harvest_job_create_all')(context,{})
print 'Created %s new harvest jobs' % len(jobs)
def print_harvest_sources(self, sources):
if sources:
@ -235,7 +253,7 @@ class Harvester(CkanCommand):
print ' active: %s' % source['active']
print ' user: %s' % source['user_id']
print 'publisher: %s' % source['publisher_id']
print ' jobs: %s' % len(source['jobs'])
print ' jobs: %s' % source['status']['job_count']
print ''
def print_harvest_jobs(self, jobs):
@ -247,8 +265,7 @@ class Harvester(CkanCommand):
def print_harvest_job(self, job):
print ' Job id: %s' % job['id']
print ' status: %s' % job['status']
print ' source: %s' % job['source']['id']
print ' url: %s' % job['source']['url']
print ' source: %s' % job['source']
print ' objects: %s' % len(job['objects'])
print 'gather_errors: %s' % len(job['gather_errors'])

View File

@ -2,34 +2,64 @@ from lxml import etree
from lxml.etree import XMLSyntaxError
from pylons.i18n import _
from ckan.authz import Authorizer
from ckan import model
from ckan.model.group import Group
import ckan.lib.helpers as h, json
from ckan.lib.base import BaseController, c, g, request, \
response, session, render, config, abort, redirect
from ckan.lib.navl.dictization_functions import DataError
from ckan.logic import NotFound, ValidationError
from ckan.logic import NotFound, ValidationError, get_action, NotAuthorized
from ckanext.harvest.logic.schema import harvest_source_form_schema
from ckanext.harvest.lib import create_harvest_source, edit_harvest_source, \
get_harvest_source, get_harvest_sources, \
create_harvest_job, get_registered_harvesters_info, \
get_harvest_object
from ckan.lib.helpers import Page
from ckan.lib.helpers import Page,pager_url
import logging
log = logging.getLogger(__name__)
class ViewController(BaseController):
def __before__(self, action, **env):
super(ViewController, self).__before__(action, **env)
# All calls to this controller must be with a sysadmin key
if not self.authorizer.is_sysadmin(c.user):
response_msg = _('Not authorized to see this page')
status = 401
abort(status, response_msg)
not_auth_message = _('Not authorized to see this page')
def __before__(self, action, **params):
super(ViewController,self).__before__(action, **params)
c.publisher_auth = (config.get('ckan.harvest.auth.profile',None) == 'publisher')
def _get_publishers(self):
groups = None
if c.publisher_auth:
if Authorizer().is_sysadmin(c.user):
groups = Group.all(group_type='publisher')
elif c.userobj:
groups = c.userobj.get_groups('publisher')
else: # anonymous user shouldn't have access to this page anyway.
groups = []
# Be explicit about which fields we make available in the template
groups = [ {
'name': g.name,
'id': g.id,
'title': g.title,
} for g in groups ]
return groups
def index(self):
context = {'model':model, 'user':c.user,'session':model.Session}
try:
# Request all harvest sources
c.sources = get_harvest_sources()
c.sources = get_action('harvest_source_list')(context,{})
except NotAuthorized,e:
abort(401,self.not_auth_message)
if c.publisher_auth:
c.sources = sorted(c.sources,key=lambda source : source['publisher_title'])
return render('index.html')
@ -41,8 +71,16 @@ class ViewController(BaseController):
data = data or {}
errors = errors or {}
error_summary = error_summary or {}
vars = {'data': data, 'errors': errors, 'error_summary': error_summary, 'harvesters': get_registered_harvesters_info()}
try:
context = {'model':model, 'user':c.user}
harvesters_info = get_action('harvesters_info_show')(context,{})
except NotAuthorized,e:
abort(401,self.not_auth_message)
vars = {'data': data, 'errors': errors, 'error_summary': error_summary, 'harvesters': harvesters_info}
c.groups = self._get_publishers()
c.form = render('source/new_source_form.html', extra_vars=vars)
return render('source/new.html')
@ -50,15 +88,19 @@ class ViewController(BaseController):
try:
data_dict = dict(request.params)
self._check_data_dict(data_dict)
context = {'model':model, 'user':c.user, 'session':model.Session,
'schema':harvest_source_form_schema()}
source = create_harvest_source(data_dict)
source = get_action('harvest_source_create')(context,data_dict)
# Create a harvest job for the new source
create_harvest_job(source['id'])
get_action('harvest_job_create')(context,{'source_id':source['id']})
h.flash_success(_('New harvest source added successfully.'
'A new harvest job for the source has also been created.'))
redirect(h.url_for('harvest'))
redirect('/harvest/%s' % source['id'])
except NotAuthorized,e:
abort(401,self.not_auth_message)
except DataError,e:
abort(400, 'Integrity Error')
except ValidationError,e:
@ -71,30 +113,46 @@ class ViewController(BaseController):
if ('save' in request.params) and not data:
return self._save_edit(id)
if not data:
try:
old_data = get_harvest_source(id)
context = {'model':model, 'user':c.user}
old_data = get_action('harvest_source_show')(context, {'id':id})
except NotFound:
abort(404, _('Harvest Source not found'))
except NotAuthorized,e:
abort(401,self.not_auth_message)
data = data or old_data
errors = errors or {}
error_summary = error_summary or {}
#TODO: Use new description interface to build the types select and descriptions
vars = {'data': data, 'errors': errors, 'error_summary': error_summary, 'harvesters': get_registered_harvesters_info()}
try:
context = {'model':model, 'user':c.user}
harvesters_info = get_action('harvesters_info_show')(context,{})
except NotAuthorized,e:
abort(401,self.not_auth_message)
vars = {'data': data, 'errors': errors, 'error_summary': error_summary, 'harvesters': harvesters_info}
c.groups = self._get_publishers()
c.form = render('source/new_source_form.html', extra_vars=vars)
return render('source/edit.html')
def _save_edit(self,id):
try:
data_dict = dict(request.params)
data_dict['id'] = id
self._check_data_dict(data_dict)
context = {'model':model, 'user':c.user, 'session':model.Session,
'schema':harvest_source_form_schema()}
source = edit_harvest_source(id,data_dict)
source = get_action('harvest_source_update')(context,data_dict)
h.flash_success(_('Harvest source edited successfully.'))
redirect(h.url_for('harvest'))
redirect('/harvest/%s' %id)
except NotAuthorized,e:
abort(401,self.not_auth_message)
except DataError,e:
abort(400, _('Integrity Error'))
except NotFound, e:
@ -106,45 +164,60 @@ class ViewController(BaseController):
def _check_data_dict(self, data_dict):
'''Check if the return data is correct'''
surplus_keys_schema = ['id','publisher_id','user_id','active','save','config']
surplus_keys_schema = ['id','publisher_id','user_id','config','save']
schema_keys = harvest_source_form_schema().keys()
keys_in_schema = set(schema_keys) - set(surplus_keys_schema)
# user_id is not yet used, we'll set the logged user one for the time being
if not data_dict.get('user_id',None):
if c.userobj:
data_dict['user_id'] = c.userobj.id
if keys_in_schema - set(data_dict.keys()):
log.info(_('Incorrect form fields posted'))
raise DataError(data_dict)
def read(self,id):
try:
c.source = get_harvest_source(id)
context = {'model':model, 'user':c.user}
c.source = get_action('harvest_source_show')(context, {'id':id})
c.page = Page(
collection=c.source['status']['packages'],
page=request.params.get('page', 1),
items_per_page=20
items_per_page=20,
url=pager_url
)
return render('source/read.html')
except NotFound:
abort(404,_('Harvest source not found'))
except NotAuthorized,e:
abort(401,self.not_auth_message)
def delete(self,id):
try:
delete_harvest_source(id)
context = {'model':model, 'user':c.user}
get_action('harvest_source_delete')(context, {'id':id})
h.flash_success(_('Harvesting source deleted successfully'))
h.flash_success(_('Harvesting source successfully inactivated'))
redirect(h.url_for('harvest'))
except NotFound:
abort(404,_('Harvest source not found'))
except NotAuthorized,e:
abort(401,self.not_auth_message)
def create_harvesting_job(self,id):
try:
create_harvest_job(id)
context = {'model':model, 'user':c.user, 'session':model.Session}
get_action('harvest_job_create')(context,{'source_id':id})
h.flash_success(_('Refresh requested, harvesting will take place within 15 minutes.'))
except NotFound:
abort(404,_('Harvest source not found'))
except NotAuthorized,e:
abort(401,self.not_auth_message)
except Exception, e:
msg = 'An error occurred: [%s]' % e.message
h.flash_error(msg)
@ -152,23 +225,28 @@ class ViewController(BaseController):
redirect(h.url_for('harvest'))
def show_object(self,id):
try:
object = get_harvest_object(id)
context = {'model':model, 'user':c.user}
obj = get_action('harvest_object_show')(context, {'id':id})
# Check content type. It will probably be either XML or JSON
try:
etree.fromstring(object['content'])
etree.fromstring(obj['content'])
response.content_type = 'application/xml'
except XMLSyntaxError:
try:
json.loads(object['content'])
json.loads(obj['content'])
response.content_type = 'application/json'
except ValueError:
pass
response.headers["Content-Length"] = len(object['content'])
return object['content']
response.headers['Content-Length'] = len(obj['content'])
return obj['content']
except NotFound:
abort(404,_('Harvest object not found'))
except NotAuthorized,e:
abort(401,self.not_auth_message)
except Exception, e:
msg = 'An error occurred: [%s]' % e.message
h.flash_error(msg)

View File

@ -1,6 +1,8 @@
import logging
import re
from sqlalchemy.sql import update,and_, bindparam
from ckan import model
from ckan.model import Session, Package
from ckan.logic import ValidationError, NotFound, get_action
@ -28,6 +30,8 @@ class HarvesterBase(SingletonPlugin):
'''
implements(IHarvester)
config = None
def _gen_new_name(self,title):
'''
Creates a URL friendly name from a title
@ -145,12 +149,11 @@ class HarvesterBase(SingletonPlugin):
log.info('Package with GUID %s exists and needs to be updated' % harvest_object.guid)
# Update package
context.update({'id':package_dict['id']})
updated_package = get_action('package_update_rest')(context, package_dict)
new_package = get_action('package_update_rest')(context, package_dict)
harvest_object.package_id = updated_package['id']
harvest_object.save()
else:
log.info('Package with GUID %s not updated, skipping...' % harvest_object.guid)
return
except NotFound:
# Package needs to be created
@ -161,6 +164,20 @@ class HarvesterBase(SingletonPlugin):
log.info('Package with GUID %s does not exist, let\'s create it' % harvest_object.guid)
new_package = get_action('package_create_rest')(context, package_dict)
harvest_object.package_id = new_package['id']
# Flag the other objects linking to this package as not current anymore
from ckanext.harvest.model import harvest_object_table
conn = Session.connection()
u = update(harvest_object_table) \
.where(harvest_object_table.c.package_id==bindparam('b_package_id')) \
.values(current=False)
conn.execute(u, b_package_id=new_package['id'])
Session.commit()
# Flag this as the current harvest object
harvest_object.package_id = new_package['id']
harvest_object.current = True
harvest_object.save()
return True

View File

@ -99,6 +99,11 @@ class CKANHarvester(HarvesterBase):
except NotFound,e:
raise ValueError('User not found')
for key in ('read_only','force_all'):
if key in config_obj:
if not isinstance(config_obj[key],bool):
raise ValueError('%s must be boolean' % key)
except ValueError,e:
raise e
@ -125,7 +130,8 @@ class CKANHarvester(HarvesterBase):
base_rest_url = base_url + self._get_rest_api_offset()
base_search_url = base_url + self._get_search_api_offset()
if previous_job and not previous_job.gather_errors and not len(previous_job.objects) == 0:
if (previous_job and not previous_job.gather_errors and not len(previous_job.objects) == 0):
if not self.config.get('force_all',False):
get_all_packages = False
# Request only the packages modified since last harvest job
@ -260,9 +266,11 @@ class CKANHarvester(HarvesterBase):
if isinstance(value,basestring):
value = value.format(harvest_source_id=harvest_object.job.source.id,
harvest_source_url=harvest_object.job.source.url.strip('/'),
harvest_source_title=harvest_object.job.source.title,
harvest_job_id=harvest_object.job.id,
harvest_object_id=harvest_object.id,
dataset_id=package_dict['id'])
package_dict['extras'][key] = value
result = self._create_or_update_package(package_dict,harvest_object)

View File

@ -1,406 +0,0 @@
import urlparse
import re
from sqlalchemy import distinct,func
from ckan.model import Session, repo
from ckan.model import Package
from ckan.lib.navl.dictization_functions import validate
from ckan.logic import NotFound, ValidationError
from ckanext.harvest.logic.schema import harvest_source_form_schema
from ckan.plugins import PluginImplementations
from ckanext.harvest.model import HarvestSource, HarvestJob, HarvestObject, \
HarvestGatherError, HarvestObjectError
from ckanext.harvest.queue import get_gather_publisher
from ckanext.harvest.interfaces import IHarvester
import logging
log = logging.getLogger('ckanext')
def _get_source_status(source, detailed=True):
out = dict()
job_count = HarvestJob.filter(source=source).count()
if not job_count:
out['msg'] = 'No jobs yet'
return out
out = {'next_harvest':'',
'last_harvest_request':'',
'last_harvest_statistics':{'added':0,'updated':0,'errors':0},
'last_harvest_errors':{'gather':[],'object':[]},
'overall_statistics':{'added':0, 'errors':0},
'packages':[]}
# Get next scheduled job
next_job = HarvestJob.filter(source=source,status=u'New').first()
if next_job:
out['next_harvest'] = 'Scheduled'
else:
out['next_harvest'] = 'Not yet scheduled'
# Get the last finished job
last_job = HarvestJob.filter(source=source,status=u'Finished') \
.order_by(HarvestJob.created.desc()).first()
if last_job:
#TODO: Should we encode the dates as strings?
out['last_harvest_request'] = str(last_job.gather_finished)
#Get HarvestObjects from last job whit links to packages
if detailed:
last_objects = [obj for obj in last_job.objects if obj.package is not None]
if len(last_objects) == 0:
# No packages added or updated
out['last_harvest_statistics']['added'] = 0
out['last_harvest_statistics']['updated'] = 0
else:
# Check wether packages were added or updated
for last_object in last_objects:
# Check if the same package had been linked before
previous_objects = Session.query(HarvestObject) \
.filter(HarvestObject.package==last_object.package) \
.count()
if previous_objects == 1:
# It didn't previously exist, it has been added
out['last_harvest_statistics']['added'] += 1
else:
# Pacakge already existed, but it has been updated
out['last_harvest_statistics']['updated'] += 1
# Last harvest errors
# We have the gathering errors in last_job.gather_errors, so let's also
# get also the object errors.
object_errors = Session.query(HarvestObjectError).join(HarvestObject) \
.filter(HarvestObject.job==last_job)
out['last_harvest_statistics']['errors'] = len(last_job.gather_errors) \
+ object_errors.count()
if detailed:
for gather_error in last_job.gather_errors:
out['last_harvest_errors']['gather'].append(gather_error.message)
for object_error in object_errors:
err = {'object_id':object_error.object.id,'object_guid':object_error.object.guid,'message': object_error.message}
out['last_harvest_errors']['object'].append(err)
# Overall statistics
packages = Session.query(distinct(HarvestObject.package_id),Package.name) \
.join(Package).join(HarvestJob).join(HarvestSource) \
.filter(HarvestJob.source==source)
out['overall_statistics']['added'] = packages.count()
if detailed:
for package in packages:
out['packages'].append(package.name)
gather_errors = Session.query(HarvestGatherError) \
.join(HarvestJob).join(HarvestSource) \
.filter(HarvestJob.source==source).count()
object_errors = Session.query(HarvestObjectError) \
.join(HarvestObject).join(HarvestJob).join(HarvestSource) \
.filter(HarvestJob.source==source).count()
out['overall_statistics']['errors'] = gather_errors + object_errors
else:
out['last_harvest_request'] = 'Not yet harvested'
return out
def _source_as_dict(source, detailed=True):
out = source.as_dict()
out['jobs'] = []
for job in source.jobs:
out['jobs'].append(job.as_dict())
out['status'] = _get_source_status(source, detailed=detailed)
return out
def _job_as_dict(job):
out = job.as_dict()
out['source'] = job.source.as_dict()
out['objects'] = []
out['gather_errors'] = []
for obj in job.objects:
out['objects'].append(obj.as_dict())
for error in job.gather_errors:
out['gather_errors'].append(error.as_dict())
return out
def _object_as_dict(obj):
out = obj.as_dict()
out['source'] = obj.source.as_dict()
out['job'] = obj.job.as_dict()
if obj.package:
out['package'] = obj.package.as_dict()
out['errors'] = []
for error in obj.errors:
out['errors'].append(error.as_dict())
return out
def _url_exists(url):
new_url = _normalize_url(url)
existing_sources = get_harvest_sources()
for existing_source in existing_sources:
existing_url = _normalize_url(existing_source['url'])
if existing_url == new_url and existing_source['active'] == True:
return existing_source
return False
def _normalize_url(url):
o = urlparse.urlparse(url)
# Normalize port
if ':' in o.netloc:
parts = o.netloc.split(':')
if (o.scheme == 'http' and parts[1] == '80') or \
(o.scheme == 'https' and parts[1] == '443'):
netloc = parts[0]
else:
netloc = ':'.join(parts)
else:
netloc = o.netloc
# Remove trailing slash
path = o.path.rstrip('/')
check_url = urlparse.urlunparse((
o.scheme,
netloc,
path,
None,None,None))
return check_url
def _prettify(field_name):
field_name = re.sub('(?<!\w)[Uu]rl(?!\w)', 'URL', field_name.replace('_', ' ').capitalize())
return field_name.replace('_', ' ')
def _error_summary(error_dict):
error_summary = {}
for key, error in error_dict.iteritems():
error_summary[_prettify(key)] = error[0]
return error_summary
def get_harvest_source(id,attr=None):
source = HarvestSource.get(id,attr=attr)
if not source:
raise NotFound
return _source_as_dict(source)
def get_harvest_sources(**kwds):
sources = HarvestSource.filter(**kwds) \
.order_by(HarvestSource.created.desc()) \
.all()
return [_source_as_dict(source, detailed=False) for source in sources]
def create_harvest_source(data_dict):
schema = harvest_source_form_schema()
data, errors = validate(data_dict, schema)
if errors:
Session.rollback()
raise ValidationError(errors,_error_summary(errors))
source = HarvestSource()
source.url = data['url']
source.type = data['type']
opt = ['active','description','user_id','publisher_id','config']
for o in opt:
if o in data and data[o] is not None:
source.__setattr__(o,data[o])
source.save()
return _source_as_dict(source)
def edit_harvest_source(source_id,data_dict):
schema = harvest_source_form_schema()
source = HarvestSource.get(source_id)
if not source:
raise NotFound('Harvest source %s does not exist' % source_id)
# Add source id to the dict, as some validators will need it
data_dict['id'] = source.id
data, errors = validate(data_dict, schema)
if errors:
Session.rollback()
raise ValidationError(errors,_error_summary(errors))
fields = ['url','type','active','description','user_id','publisher_id']
for f in fields:
if f in data_dict and data_dict[f] is not None and data_dict[f] != '':
source.__setattr__(f,data_dict[f])
source.config = data_dict['config']
source.save()
return _source_as_dict(source)
def remove_harvest_source(source_id):
source = HarvestSource.get(source_id)
if not source:
raise NotFound('Harvest source %s does not exist' % source_id)
# Don't actually delete the record, just flag it as inactive
source.active = False
source.save()
# Abort any pending jobs
jobs = HarvestJob.filter(source=source,status=u'New')
if jobs:
for job in jobs:
job.status = u'Aborted'
job.save()
return True
def get_harvest_job(id,attr=None):
job = HarvestJob.get(id,attr=attr)
if not job:
raise NotFound
return _job_as_dict(job)
def get_harvest_jobs(**kwds):
jobs = HarvestJob.filter(**kwds).all()
return [_job_as_dict(job) for job in jobs]
def create_harvest_job(source_id):
# Check if source exists
source = HarvestSource.get(source_id)
if not source:
raise NotFound('Harvest source %s does not exist' % source_id)
# Check if the source is active
if not source.active:
raise Exception('Can not create jobs on inactive sources')
# Check if there already is an unrun job for this source
exists = get_harvest_jobs(source=source,status=u'New')
if len(exists):
raise Exception('There already is an unrun job for this source')
job = HarvestJob()
job.source = source
job.save()
return _job_as_dict(job)
def run_harvest_jobs():
# Check if there are pending harvest jobs
jobs = get_harvest_jobs(status=u'New')
if len(jobs) == 0:
raise Exception('There are no new harvesting jobs')
# Send each job to the gather queue
publisher = get_gather_publisher()
sent_jobs = []
for job in jobs:
if job['source']['active']:
publisher.send({'harvest_job_id': job['id']})
log.info('Sent job %s to the gather queue' % job['id'])
sent_jobs.append(job)
publisher.close()
return sent_jobs
def get_harvest_object(id,attr=None):
obj = HarvestObject.get(id,attr=attr)
if not obj:
raise NotFound
return _object_as_dict(obj)
def get_harvest_objects(**kwds):
objects = HarvestObject.filter(**kwds).all()
return [_object_as_dict(obj) for obj in objects]
def import_last_objects(source_id=None):
if source_id:
source = HarvestSource.get(source_id)
if not source:
raise NotFound('Harvest source %s does not exist' % source_id)
last_objects_ids = Session.query(HarvestObject.id) \
.join(HarvestJob) \
.filter(HarvestJob.source==source) \
.filter(HarvestObject.package!=None) \
.order_by(HarvestObject.guid) \
.order_by(HarvestObject.metadata_modified_date.desc()) \
.order_by(HarvestObject.gathered.desc()) \
.all()
else:
last_objects_ids = Session.query(HarvestObject.id) \
.filter(HarvestObject.package!=None) \
.order_by(HarvestObject.guid) \
.order_by(HarvestObject.metadata_modified_date.desc()) \
.order_by(HarvestObject.gathered.desc()) \
.all()
last_obj_guid = ''
imported_objects = []
for obj_id in last_objects_ids:
obj = Session.query(HarvestObject).get(obj_id)
if obj.guid != last_obj_guid:
imported_objects.append(obj)
for harvester in PluginImplementations(IHarvester):
if harvester.info()['name'] == obj.job.source.type:
if hasattr(harvester,'force_import'):
harvester.force_import = True
harvester.import_stage(obj)
break
last_obj_guid = obj.guid
return imported_objects
def create_harvest_job_all():
# Get all active sources
sources = get_harvest_sources(active=True)
jobs = []
# Create a new job for each
for source in sources:
job = create_harvest_job(source['id'])
jobs.append(job)
return jobs
def get_registered_harvesters_info():
available_harvesters = []
for harvester in PluginImplementations(IHarvester):
info = harvester.info()
if not info or 'name' not in info:
log.error('Harvester %r does not provide the harvester name in the info response' % str(harvester))
continue
info['show_config'] = (info.get('form_config_interface','') == 'Text')
available_harvesters.append(info)
return available_harvesters

View File

@ -0,0 +1,7 @@
try:
import pkg_resources
pkg_resources.declare_namespace(__name__)
except ImportError:
import pkgutil
__path__ = pkgutil.extend_path(__path__, __name__)

View File

@ -0,0 +1,106 @@
import re
from ckan.logic import NotFound, ValidationError, check_access
from ckan.lib.navl.dictization_functions import validate
from ckanext.harvest.model import (HarvestSource, HarvestJob, HarvestObject)
from ckanext.harvest.logic.schema import default_harvest_source_schema
from ckanext.harvest.logic.dictization import (harvest_source_dictize,
harvest_job_dictize)
from ckanext.harvest.logic.action.get import harvest_source_list,harvest_job_list
def harvest_source_create(context,data_dict):
check_access('harvest_source_create',context,data_dict)
model = context['model']
session = context['session']
schema = context.get('schema') or default_harvest_source_schema()
data, errors = validate(data_dict, schema)
if errors:
session.rollback()
raise ValidationError(errors,_error_summary(errors))
source = HarvestSource()
source.url = data['url']
source.type = data['type']
opt = ['active','title','description','user_id','publisher_id','config']
for o in opt:
if o in data and data[o] is not None:
source.__setattr__(o,data[o])
if 'active' in data_dict:
source.active = data['active']
source.save()
return harvest_source_dictize(source,context)
def harvest_job_create(context,data_dict):
check_access('harvest_job_create',context,data_dict)
source_id = data_dict['source_id']
# Check if source exists
source = HarvestSource.get(source_id)
if not source:
raise NotFound('Harvest source %s does not exist' % source_id)
# Check if the source is active
if not source.active:
raise Exception('Can not create jobs on inactive sources')
# Check if there already is an unrun job for this source
data_dict ={
'source_id':source_id,
'status':u'New'
}
exists = harvest_job_list(context,data_dict)
if len(exists):
raise Exception('There already is an unrun job for this source')
job = HarvestJob()
job.source = source
job.save()
return harvest_job_dictize(job,context)
def harvest_job_create_all(context,data_dict):
check_access('harvest_job_create_all',context,data_dict)
data_dict.update({'only_active':True})
# Get all active sources
sources = harvest_source_list(context,data_dict)
jobs = []
# Create a new job for each, if there isn't already one
for source in sources:
data_dict ={
'source_id':source['id'],
'status':u'New'
}
exists = harvest_job_list(context,data_dict)
if len(exists):
continue
job = harvest_job_create(context,{'source_id':source['id']})
jobs.append(job)
return jobs
def _error_summary(error_dict):
error_summary = {}
for key, error in error_dict.iteritems():
error_summary[_prettify(key)] = error[0]
return error_summary
def _prettify(field_name):
field_name = re.sub('(?<!\w)[Uu]rl(?!\w)', 'URL', field_name.replace('_', ' ').capitalize())
return field_name.replace('_', ' ')

View File

@ -0,0 +1,26 @@
from ckan.logic import NotFound, check_access
from ckanext.harvest.model import (HarvestSource, HarvestJob)
def harvest_source_delete(context,data_dict):
check_access('harvest_source_delete',context,data_dict)
source_id = data_dict.get('id')
source = HarvestSource.get(source_id)
if not source:
raise NotFound('Harvest source %s does not exist' % source_id)
# Don't actually delete the record, just flag it as inactive
source.active = False
source.save()
# Abort any pending jobs
jobs = HarvestJob.filter(source=source,status=u'New')
if jobs:
for job in jobs:
job.status = u'Aborted'
job.save()
return True

View File

@ -0,0 +1,162 @@
from sqlalchemy import or_
from ckan.authz import Authorizer
from ckan.model import User
from ckan.plugins import PluginImplementations
from ckanext.harvest.interfaces import IHarvester
from ckan.logic import NotFound, check_access
from ckanext.harvest.model import (HarvestSource, HarvestJob, HarvestObject)
from ckanext.harvest.logic.dictization import (harvest_source_dictize,
harvest_job_dictize,
harvest_object_dictize)
def harvest_source_show(context,data_dict):
check_access('harvest_source_show',context,data_dict)
id = data_dict.get('id')
attr = data_dict.get('attr',None)
source = HarvestSource.get(id,attr=attr)
if not source:
raise NotFound
return harvest_source_dictize(source,context)
def harvest_source_list(context, data_dict):
check_access('harvest_source_list',context,data_dict)
model = context['model']
session = context['session']
user = context.get('user','')
sources = _get_sources_for_user(context, data_dict)
context.update({'detailed':False})
return [harvest_source_dictize(source, context) for source in sources]
def harvest_job_show(context,data_dict):
check_access('harvest_job_show',context,data_dict)
id = data_dict.get('id')
attr = data_dict.get('attr',None)
job = HarvestJob.get(id,attr=attr)
if not job:
raise NotFound
return harvest_job_dictize(job,context)
def harvest_job_list(context,data_dict):
check_access('harvest_job_list',context,data_dict)
model = context['model']
session = context['session']
source_id = data_dict.get('source_id',False)
status = data_dict.get('status',False)
query = session.query(HarvestJob)
if source_id:
query = query.filter(HarvestJob.source_id==source_id)
if status:
query = query.filter(HarvestJob.status==status)
jobs = query.all()
return [harvest_job_dictize(job,context) for job in jobs]
def harvest_object_show(context,data_dict):
check_access('harvest_object_show',context,data_dict)
id = data_dict.get('id')
attr = data_dict.get('attr',None)
obj = HarvestObject.get(id,attr=attr)
if not obj:
raise NotFound
return harvest_object_dictize(obj,context)
def harvest_object_list(context,data_dict):
check_access('harvest_object_list',context,data_dict)
model = context['model']
session = context['session']
only_current = data_dict.get('only_current',True)
source_id = data_dict.get('source_id',False)
query = session.query(HarvestObject)
if source_id:
query = query.filter(HarvestObject.source_id==source_id)
if only_current:
query = query.filter(HarvestObject.current==True)
objects = query.all()
return [getattr(obj,'id') for obj in objects]
def harvesters_info_show(context,data_dict):
check_access('harvesters_info_show',context,data_dict)
available_harvesters = []
for harvester in PluginImplementations(IHarvester):
info = harvester.info()
if not info or 'name' not in info:
log.error('Harvester %r does not provide the harvester name in the info response' % str(harvester))
continue
info['show_config'] = (info.get('form_config_interface','') == 'Text')
available_harvesters.append(info)
return available_harvesters
def _get_sources_for_user(context,data_dict):
model = context['model']
session = context['session']
user = context.get('user','')
only_active = data_dict.get('only_active',False)
query = session.query(HarvestSource) \
.order_by(HarvestSource.created.desc())
if only_active:
query = query.filter(HarvestSource.active==True) \
# Sysadmins will get all sources
if not Authorizer().is_sysadmin(user):
# This only applies to a non sysadmin user when using the
# publisher auth profile. When using the default profile,
# normal users will never arrive at this point, but even if they
# do, they will get an empty list.
user_obj = User.get(user)
publisher_filters = []
for publisher_id in [g.id for g in user_obj.get_groups(u'publisher',u'admin')]:
publisher_filters.append(HarvestSource.publisher_id==publisher_id)
if len(publisher_filters):
query = query.filter(or_(*publisher_filters))
else:
# This user does not belong to a publisher yet, no sources for him/her
return []
sources = query.all()
return sources

View File

@ -0,0 +1,136 @@
import logging
from ckan.plugins import PluginImplementations
from ckanext.harvest.interfaces import IHarvester
from ckan.model import Package
from ckan.logic import NotFound, ValidationError, check_access
from ckan.lib.navl.dictization_functions import validate
from ckanext.harvest.queue import get_gather_publisher
from ckanext.harvest.model import (HarvestSource, HarvestJob, HarvestObject)
from ckanext.harvest.logic.schema import default_harvest_source_schema
from ckanext.harvest.logic.dictization import (harvest_source_dictize,harvest_object_dictize)
from ckanext.harvest.logic.action.create import _error_summary
from ckanext.harvest.logic.action.get import harvest_source_show,harvest_job_list
log = logging.getLogger(__name__)
def harvest_source_update(context,data_dict):
check_access('harvest_source_update',context,data_dict)
model = context['model']
session = context['session']
source_id = data_dict.get('id')
schema = context.get('schema') or default_harvest_source_schema()
source = HarvestSource.get(source_id)
if not source:
raise NotFound('Harvest source %s does not exist' % source_id)
data, errors = validate(data_dict, schema)
if errors:
session.rollback()
raise ValidationError(errors,_error_summary(errors))
fields = ['url','title','type','description','user_id','publisher_id']
for f in fields:
if f in data and data[f] is not None:
source.__setattr__(f,data[f])
if 'active' in data_dict:
source.active = data['active']
if 'config' in data_dict:
source.config = data['config']
source.save()
# Abort any pending jobs
if not source.active:
jobs = HarvestJob.filter(source=source,status=u'New')
if jobs:
for job in jobs:
job.status = u'Aborted'
job.save()
return harvest_source_dictize(source,context)
def harvest_objects_import(context,data_dict):
'''
Reimports the current harvest objects
It performs the import stage with the last fetched objects, optionally
belonging to a certain source.
Please note that no objects will be fetched from the remote server.
It will only affect the last fetched objects already present in the
database.
'''
check_access('harvest_objects_import',context,data_dict)
model = context['model']
session = context['session']
source_id = data_dict.get('source_id',None)
if source_id:
source = HarvestSource.get(source_id)
if not source:
raise NotFound('Harvest source %s does not exist' % source_id)
if not source.active:
raise Exception('This harvest source is not active')
last_objects_ids = session.query(HarvestObject.id) \
.join(HarvestSource).join(Package) \
.filter(HarvestObject.source==source) \
.filter(HarvestObject.current==True) \
.filter(Package.state==u'active') \
.all()
else:
last_objects_ids = session.query(HarvestObject.id) \
.join(Package) \
.filter(HarvestObject.current==True) \
.filter(Package.state==u'active') \
.all()
last_objects = []
for obj_id in last_objects_ids:
obj = session.query(HarvestObject).get(obj_id)
for harvester in PluginImplementations(IHarvester):
if harvester.info()['name'] == obj.source.type:
if hasattr(harvester,'force_import'):
harvester.force_import = True
harvester.import_stage(obj)
break
last_objects.append(harvest_object_dictize(obj,context))
return last_objects
def harvest_jobs_run(context,data_dict):
check_access('harvest_jobs_run',context,data_dict)
source_id = data_dict.get('source_id',None)
# Check if there are pending harvest jobs
jobs = harvest_job_list(context,{'source_id':source_id,'status':u'New'})
if len(jobs) == 0:
raise Exception('There are no new harvesting jobs')
# Send each job to the gather queue
publisher = get_gather_publisher()
sent_jobs = []
for job in jobs:
source = harvest_source_show(context,{'id':job['source']})
if source['active']:
publisher.send({'harvest_job_id': job['id']})
log.info('Sent job %s to the gather queue' % job['id'])
sent_jobs.append(job)
publisher.close()
return sent_jobs

View File

@ -0,0 +1,39 @@
from ckan.logic import NotFound
from ckanext.harvest.model import HarvestSource, HarvestJob, HarvestObject
def get_source_object(context, data_dict = {}):
if not 'source' in context:
model = context['model']
id = data_dict.get('id',None)
source = HarvestSource.get(id)
if not source:
raise NotFound
else:
source = context['source']
return source
def get_job_object(context, data_dict = {}):
if not 'job' in context:
model = context['model']
id = data_dict.get('id',None)
job = HarvestJob.get(id)
if not job:
raise NotFound
else:
job = context['job']
return job
def get_obj_object(context, data_dict = {}):
if not 'obj' in context:
model = context['model']
id = data_dict.get('id',None)
obj = HarvestObject.get(id)
if not obj:
raise NotFound
else:
obj = context['obj']
return obj

View File

@ -0,0 +1,30 @@
from ckan.lib.base import _
from ckan.authz import Authorizer
def harvest_source_create(context,data_dict):
model = context['model']
user = context.get('user')
if not Authorizer().is_sysadmin(user):
return {'success': False, 'msg': _('User %s not authorized to create harvest sources') % str(user)}
else:
return {'success': True}
def harvest_job_create(context,data_dict):
model = context['model']
user = context.get('user')
if not Authorizer().is_sysadmin(user):
return {'success': False, 'msg': _('User %s not authorized to create harvest jobs') % str(user)}
else:
return {'success': True}
def harvest_job_create_all(context,data_dict):
model = context['model']
user = context.get('user')
if not Authorizer().is_sysadmin(user):
return {'success': False, 'msg': _('User %s not authorized to create harvest jobs for all sources') % str(user)}
else:
return {'success': True}

View File

@ -0,0 +1,13 @@
from ckan.lib.base import _
from ckan.authz import Authorizer
def harvest_source_delete(context,data_dict):
model = context['model']
user = context.get('user')
if not Authorizer().is_sysadmin(user):
return {'success': False, 'msg': _('User %s not authorized to delete harvest sources') % str(user)}
else:
return {'success': True}

View File

@ -0,0 +1,67 @@
from ckan.lib.base import _
from ckan.authz import Authorizer
def harvest_source_show(context,data_dict):
model = context['model']
user = context.get('user')
if not Authorizer().is_sysadmin(user):
return {'success': False, 'msg': _('User %s not authorized to read this harvest source') % str(user)}
else:
return {'success': True}
def harvest_source_list(context,data_dict):
model = context['model']
user = context.get('user')
if not Authorizer().is_sysadmin(user):
return {'success': False, 'msg': _('User %s not authorized to see the harvest sources') % str(user)}
else:
return {'success': True}
def harvest_job_show(context,data_dict):
model = context['model']
user = context.get('user')
if not Authorizer().is_sysadmin(user):
return {'success': False, 'msg': _('User %s not authorized to read this harvest job') % str(user)}
else:
return {'success': True}
def harvest_job_list(context,data_dict):
model = context['model']
user = context.get('user')
if not Authorizer().is_sysadmin(user):
return {'success': False, 'msg': _('User %s not authorized to see the harvest jobs') % str(user)}
else:
return {'success': True}
def harvest_object_show(context,data_dict):
model = context['model']
user = context.get('user')
if not Authorizer().is_sysadmin(user):
return {'success': False, 'msg': _('User %s not authorized to read this harvest object') % str(user)}
else:
return {'success': True}
def harvest_object_list(context,data_dict):
model = context['model']
user = context.get('user')
if not Authorizer().is_sysadmin(user):
return {'success': False, 'msg': _('User %s not authorized to see the harvest objects') % str(user)}
else:
return {'success': True}
def harvesters_info_show(context,data_dict):
model = context['model']
user = context.get('user')
if not Authorizer().is_sysadmin(user):
return {'success': False, 'msg': _('User %s not authorized to see the harvesters information') % str(user)}
else:
return {'success': True}

View File

@ -0,0 +1,7 @@
try:
import pkg_resources
pkg_resources.declare_namespace(__name__)
except ImportError:
import pkgutil
__path__ = pkgutil.extend_path(__path__, __name__)

View File

@ -0,0 +1,53 @@
from ckan.lib.base import _
from ckan.authz import Authorizer
from ckan.model import User
from ckanext.harvest.model import HarvestSource
def harvest_source_create(context,data_dict):
model = context['model']
user = context.get('user','')
# Non-logged users can not create sources
if not user:
return {'success': False, 'msg': _('Non-logged in users are not authorized to create harvest sources')}
# Sysadmins and the rest of logged users can create sources,
# as long as they belong to a publisher
user_obj = User.get(user)
if not user_obj or not Authorizer().is_sysadmin(user) and len(user_obj.get_groups(u'publisher',u'admin')) == 0:
return {'success': False, 'msg': _('User %s must belong to a publisher to create harvest sources') % str(user)}
else:
return {'success': True}
def harvest_job_create(context,data_dict):
model = context['model']
user = context.get('user')
source_id = data_dict['source_id']
if not user:
return {'success': False, 'msg': _('Non-logged in users are not authorized to create harvest jobs')}
if Authorizer().is_sysadmin(user):
return {'success': True}
user_obj = User.get(user)
source = HarvestSource.get(source_id)
if not source:
raise NotFound
if not user_obj or not source.publisher_id in [g.id for g in user_obj.get_groups(u'publisher',u'admin')]:
return {'success': False, 'msg': _('User %s not authorized to create a job for source %s') % (str(user),source.id)}
else:
return {'success': True}
def harvest_job_create_all(context,data_dict):
model = context['model']
user = context.get('user')
if not Authorizer().is_sysadmin(user):
return {'success': False, 'msg': _('Only sysadmins can create harvest jobs for all sources') % str(user)}
else:
return {'success': True}

View File

@ -0,0 +1,27 @@
from ckan.lib.base import _
from ckan.authz import Authorizer
from ckan.model import User
from ckanext.harvest.logic.auth import get_source_object
def harvest_source_delete(context,data_dict):
model = context['model']
user = context.get('user','')
source = get_source_object(context,data_dict)
# Non-logged users can not delete this source
if not user:
return {'success': False, 'msg': _('Non-logged in users are not authorized to delete harvest sources')}
# Sysadmins can delete the source
if Authorizer().is_sysadmin(user):
return {'success': True}
# Check if the source publisher id exists on the user's groups
user_obj = User.get(user)
if not user_obj or not source.publisher_id in [g.id for g in user_obj.get_groups(u'publisher',u'admin')]:
return {'success': False, 'msg': _('User %s not authorized to delete harvest source %s') % (str(user),source.id)}
else:
return {'success': True}

View File

@ -0,0 +1,160 @@
from ckan.lib.base import _
from ckan.logic import NotFound
from ckan.authz import Authorizer
from ckan.model import User
from ckanext.harvest.model import HarvestSource
from ckanext.harvest.logic.auth import get_source_object, get_job_object, get_obj_object
def harvest_source_show(context,data_dict):
model = context['model']
user = context.get('user','')
source = get_source_object(context,data_dict)
# Non-logged users can not read the source
if not user:
return {'success': False, 'msg': _('Non-logged in users are not authorized to see harvest sources')}
# Sysadmins can read the source
if Authorizer().is_sysadmin(user):
return {'success': True}
# Check if the source publisher id exists on the user's groups
user_obj = User.get(user)
if not user_obj or not source.publisher_id in [g.id for g in user_obj.get_groups(u'publisher',u'admin')]:
return {'success': False, 'msg': _('User %s not authorized to read harvest source %s') % (str(user),source.id)}
else:
return {'success': True}
def harvest_source_list(context,data_dict):
model = context['model']
user = context.get('user')
# Here we will just check that the user is logged in.
# The logic action will return an empty list if the user does not
# have permissons on any source.
if not user:
return {'success': False, 'msg': _('Only logged users are authorized to see their sources')}
else:
user_obj = User.get(user)
# Only users belonging to a publisher can list sources,
# unless they are sysadmins
if not user_obj or not Authorizer().is_sysadmin(user) and len(user_obj.get_groups(u'publisher',u'admin')) == 0:
return {'success': False, 'msg': _('User %s must belong to a publisher to list harvest sources') % str(user)}
else:
return {'success': True}
def harvest_job_show(context,data_dict):
model = context['model']
user = context.get('user')
job = get_job_object(context,data_dict)
if not user:
return {'success': False, 'msg': _('Non-logged in users are not authorized to see harvest jobs')}
if Authorizer().is_sysadmin(user):
return {'success': True}
user_obj = User.get(user)
if not user_obj or not job.source.publisher_id in [g.id for g in user_obj.get_groups(u'publisher',u'admin')]:
return {'success': False, 'msg': _('User %s not authorized to read harvest job %s') % (str(user),job.id)}
else:
return {'success': True}
def harvest_job_list(context,data_dict):
model = context['model']
user = context.get('user')
# Check user is logged in
if not user:
return {'success': False, 'msg': _('Only logged users are authorized to see their sources')}
user_obj = User.get(user)
# Checks for non sysadmin users
if not Authorizer().is_sysadmin(user):
if not user_obj or len(user_obj.get_groups(u'publisher',u'admin')) == 0:
return {'success': False, 'msg': _('User %s must belong to a publisher to list harvest jobs') % str(user)}
source_id = data_dict.get('source_id',False)
if not source_id:
return {'success': False, 'msg': _('Only sysadmins can list all harvest jobs') % str(user)}
source = HarvestSource.get(source_id)
if not source:
raise NotFound
if not source.publisher_id in [g.id for g in user_obj.get_groups(u'publisher',u'admin')]:
return {'success': False, 'msg': _('User %s not authorized to list jobs from source %s') % (str(user),source.id)}
return {'success': True}
def harvest_object_show(context,data_dict):
model = context['model']
user = context.get('user')
obj = get_obj_object(context,data_dict)
if context.get('ignore_auth', False):
return {'success': True}
if not user:
return {'success': False, 'msg': _('Non-logged in users are not authorized to see harvest objects')}
if Authorizer().is_sysadmin(user):
return {'success': True}
user_obj = User.get(user)
if not user_obj or not obj.source.publisher_id in [g.id for g in user_obj.get_groups(u'publisher',u'admin')]:
return {'success': False, 'msg': _('User %s not authorized to read harvest object %s') % (str(user),obj.id)}
else:
return {'success': True}
def harvest_object_list(context,data_dict):
model = context['model']
user = context.get('user')
# Check user is logged in
if not user:
return {'success': False, 'msg': _('Only logged users are authorized to see their sources')}
user_obj = User.get(user)
# Checks for non sysadmin users
if not Authorizer().is_sysadmin(user):
if not user_obj or len(user_obj.get_groups(u'publisher',u'admin')) == 0:
return {'success': False, 'msg': _('User %s must belong to a publisher to list harvest objects') % str(user)}
source_id = data_dict.get('source_id',False)
if not source_id:
return {'success': False, 'msg': _('Only sysadmins can list all harvest objects') % str(user)}
source = HarvestSource.get(source_id)
if not source:
raise NotFound
if not source.publisher_id in [g.id for g in user_obj.get_groups(u'publisher',u'admin')]:
return {'success': False, 'msg': _('User %s not authorized to list objects from source %s') % (str(user),source.id)}
return {'success': True}
def harvesters_info_show(context,data_dict):
model = context['model']
user = context.get('user','')
# Non-logged users can not create sources
if not user:
return {'success': False, 'msg': _('Non-logged in users can not see the harvesters info')}
# Sysadmins and the rest of logged users can see the harvesters info,
# as long as they belong to a publisher
user_obj = User.get(user)
if not user_obj or not Authorizer().is_sysadmin(user) and len(user_obj.get_groups(u'publisher',u'admin')) == 0:
return {'success': False, 'msg': _('User %s must belong to a publisher to see the harvesters info') % str(user)}
else:
return {'success': True}

View File

@ -0,0 +1,83 @@
from ckan.lib.base import _
from ckan.authz import Authorizer
from ckan.model import User
from ckanext.harvest.logic.auth import get_source_object
def harvest_source_update(context,data_dict):
model = context['model']
user = context.get('user','')
source = get_source_object(context,data_dict)
# Non-logged users can not update this source
if not user:
return {'success': False, 'msg': _('Non-logged in users are not authorized to update harvest sources')}
# Sysadmins can update the source
if Authorizer().is_sysadmin(user):
return {'success': True}
# Check if the source publisher id exists on the user's groups
user_obj = User.get(user)
if not user_obj or not source.publisher_id in [g.id for g in user_obj.get_groups(u'publisher',u'admin')]:
return {'success': False, 'msg': _('User %s not authorized to update harvest source %s') % (str(user),source.id)}
else:
return {'success': True}
def harvest_objects_import(context,data_dict):
model = context['model']
user = context.get('user')
# Check user is logged in
if not user:
return {'success': False, 'msg': _('Only logged users are authorized to reimport harvest objects')}
user_obj = User.get(user)
# Checks for non sysadmin users
if not Authorizer().is_sysadmin(user):
if not user_obj or len(user_obj.get_groups(u'publisher',u'admin')) == 0:
return {'success': False, 'msg': _('User %s must belong to a publisher to reimport harvest objects') % str(user)}
source_id = data_dict.get('source_id',False)
if not source_id:
return {'success': False, 'msg': _('Only sysadmins can reimport all harvest objects') % str(user)}
source = HarvestSource.get(source_id)
if not source:
raise NotFound
if not source.publisher_id in [g.id for g in user_obj.get_groups(u'publisher',u'admin')]:
return {'success': False, 'msg': _('User %s not authorized to reimport objects from source %s') % (str(user),source.id)}
return {'success': True}
def harvest_jobs_run(context,data_dict):
model = context['model']
user = context.get('user')
# Check user is logged in
if not user:
return {'success': False, 'msg': _('Only logged users are authorized to run harvest jobs')}
user_obj = User.get(user)
# Checks for non sysadmin users
if not Authorizer().is_sysadmin(user):
if not user_obj or len(user_obj.get_groups(u'publisher',u'admin')) == 0:
return {'success': False, 'msg': _('User %s must belong to a publisher to run harvest jobs') % str(user)}
source_id = data_dict.get('source_id',False)
if not source_id:
return {'success': False, 'msg': _('Only sysadmins can run all harvest jobs') % str(user)}
source = HarvestSource.get(source_id)
if not source:
raise NotFound
if not source.publisher_id in [g.id for g in user_obj.get_groups(u'publisher',u'admin')]:
return {'success': False, 'msg': _('User %s not authorized to run jobs from source %s') % (str(user),source.id)}
return {'success': True}

View File

@ -0,0 +1,30 @@
from ckan.lib.base import _
from ckan.authz import Authorizer
def harvest_source_update(context,data_dict):
model = context['model']
user = context.get('user')
if not Authorizer().is_sysadmin(user):
return {'success': False, 'msg': _('User %s not authorized to update harvest sources') % str(user)}
else:
return {'success': True}
def harvest_objects_import(context,data_dict):
model = context['model']
user = context.get('user')
if not Authorizer().is_sysadmin(user):
return {'success': False, 'msg': _('User %s not authorized to reimport harvest objects') % str(user)}
else:
return {'success': True}
def harvest_jobs_run(context,data_dict):
model = context['model']
user = context.get('user')
if not Authorizer().is_sysadmin(user):
return {'success': False, 'msg': _('User %s not authorized to run the pending harvest jobs') % str(user)}
else:
return {'success': True}

View File

@ -0,0 +1,153 @@
from sqlalchemy import distinct
from ckan.model import Package,Group
from ckanext.harvest.model import HarvestSource, HarvestJob, HarvestObject, \
HarvestGatherError, HarvestObjectError
def harvest_source_dictize(source, context):
out = source.as_dict()
out['publisher_title'] = u''
publisher_id = out.get('publisher_id')
if publisher_id:
group = Group.get(publisher_id)
if group:
out['publisher_title'] = group.title
out['status'] = _get_source_status(source, context)
return out
def harvest_job_dictize(job, context):
out = job.as_dict()
out['source'] = job.source_id
out['objects'] = []
out['gather_errors'] = []
for obj in job.objects:
out['objects'].append(obj.as_dict())
for error in job.gather_errors:
out['gather_errors'].append(error.as_dict())
return out
def harvest_object_dictize(obj, context):
out = obj.as_dict()
out['source'] = obj.harvest_source_id
out['job'] = obj.harvest_job_id
if obj.package:
out['package'] = obj.package.id
out['errors'] = []
for error in obj.errors:
out['errors'].append(error.as_dict())
return out
def _get_source_status(source, context):
model = context.get('model')
detailed = context.get('detailed',True)
out = dict()
job_count = HarvestJob.filter(source=source).count()
if not job_count:
out['msg'] = 'No jobs yet'
return out
out = {
'job_count': job_count,
'next_harvest':'',
'last_harvest_request':'',
'last_harvest_statistics':{'added':0,'updated':0,'errors':0},
'last_harvest_errors':{'gather':[],'object':[]},
'overall_statistics':{'added':0, 'errors':0},
'packages':[]}
# Get next scheduled job
next_job = HarvestJob.filter(source=source,status=u'New').first()
if next_job:
out['next_harvest'] = 'Scheduled'
else:
out['next_harvest'] = 'Not yet scheduled'
# Get the last finished job
last_job = HarvestJob.filter(source=source,status=u'Finished') \
.order_by(HarvestJob.created.desc()).first()
if last_job:
#TODO: Should we encode the dates as strings?
out['last_harvest_request'] = str(last_job.gather_finished)
#Get HarvestObjects from last job whit links to packages
if detailed:
last_objects = [obj for obj in last_job.objects if obj.package is not None]
if len(last_objects) == 0:
# No packages added or updated
out['last_harvest_statistics']['added'] = 0
out['last_harvest_statistics']['updated'] = 0
else:
# Check wether packages were added or updated
for last_object in last_objects:
# Check if the same package had been linked before
previous_objects = model.Session.query(HarvestObject) \
.filter(HarvestObject.package==last_object.package) \
.count()
if previous_objects == 1:
# It didn't previously exist, it has been added
out['last_harvest_statistics']['added'] += 1
else:
# Pacakge already existed, but it has been updated
out['last_harvest_statistics']['updated'] += 1
# Last harvest errors
# We have the gathering errors in last_job.gather_errors, so let's also
# get also the object errors.
object_errors = model.Session.query(HarvestObjectError).join(HarvestObject) \
.filter(HarvestObject.job==last_job)
out['last_harvest_statistics']['errors'] = len(last_job.gather_errors) \
+ object_errors.count()
if detailed:
for gather_error in last_job.gather_errors:
out['last_harvest_errors']['gather'].append(gather_error.message)
for object_error in object_errors:
err = {'object_id':object_error.object.id,'object_guid':object_error.object.guid,'message': object_error.message}
out['last_harvest_errors']['object'].append(err)
# Overall statistics
packages = model.Session.query(distinct(HarvestObject.package_id),Package.name) \
.join(Package).join(HarvestSource) \
.filter(HarvestObject.source==source) \
.filter(HarvestObject.current==True) \
.filter(Package.state==u'active')
out['overall_statistics']['added'] = packages.count()
if detailed:
for package in packages:
out['packages'].append(package.name)
gather_errors = model.Session.query(HarvestGatherError) \
.join(HarvestJob).join(HarvestSource) \
.filter(HarvestJob.source==source).count()
object_errors = model.Session.query(HarvestObjectError) \
.join(HarvestObject).join(HarvestJob).join(HarvestSource) \
.filter(HarvestJob.source==source).count()
out['overall_statistics']['errors'] = gather_errors + object_errors
else:
out['last_harvest_request'] = 'Not yet harvested'
return out

View File

@ -1,3 +1,5 @@
from ckan.lib.base import config
from ckan.lib.navl.validators import (ignore_missing,
not_empty,
empty,
@ -5,10 +7,11 @@ from ckan.lib.navl.validators import (ignore_missing,
not_missing
)
from ckanext.harvest.logic.validators import harvest_source_id_exists, \
harvest_source_url_validator, \
harvest_source_type_exists, \
harvest_source_config_validator
from ckanext.harvest.logic.validators import (harvest_source_id_exists,
harvest_source_url_validator,
harvest_source_type_exists,
harvest_source_config_validator,
harvest_source_active_validator,)
def default_harvest_source_schema():
@ -16,13 +19,18 @@ def default_harvest_source_schema():
'id': [ignore_missing, unicode, harvest_source_id_exists],
'url': [not_empty, unicode, harvest_source_url_validator],
'type': [not_empty, unicode, harvest_source_type_exists],
'description': [ignore_missing],
'active': [ignore_missing],
'user_id': [ignore_missing],
'publisher_id': [ignore_missing],
'title': [ignore_missing,unicode],
'description': [ignore_missing,unicode],
'active': [ignore_missing,harvest_source_active_validator],
'user_id': [ignore_missing,unicode],
'config': [ignore_missing,harvest_source_config_validator]
}
if config.get('ckan.harvest.auth.profile',None) == 'publisher':
schema['publisher_id'] = [not_empty,unicode]
else:
schema['publisher_id'] = [ignore_missing,unicode]
return schema

View File

@ -55,8 +55,8 @@ def harvest_source_url_validator(key,data,errors,context):
for url,active in existing_sources:
url = _normalize_url(url)
if url == new_url and active == True:
raise Invalid('There already is an active Harvest Source for this URL: %s' % data[key])
if url == new_url:
raise Invalid('There already is a Harvest Source for this URL: %s' % data[key])
return data[key]
@ -91,3 +91,11 @@ def harvest_source_config_validator(key,data,errors,context):
else:
return data[key]
def harvest_source_active_validator(value,context):
if isinstance(value,basestring):
if value.lower() == 'true':
return True
else:
return False
return bool(value)

View File

@ -1,14 +1,22 @@
import logging
import datetime
from sqlalchemy import event
from sqlalchemy import distinct
from sqlalchemy.engine.reflection import Inspector
from sqlalchemy.orm import backref, relation
from ckan import model
from ckan.model.meta import *
from ckan.model.meta import (metadata, mapper, Session,
Table, Column, ForeignKey, types)
from ckan.model.types import make_uuid
from ckan.model.core import *
from ckan.model.domain_object import DomainObject
from ckan.model.package import Package
from sqlalchemy.orm import backref, relation
log = logging.getLogger(__name__)
__all__ = [
@ -27,12 +35,34 @@ harvest_gather_error_table = None
harvest_object_error_table = None
def setup():
if harvest_source_table is None:
create_harvester_tables()
define_harvester_tables()
log.debug('Harvest tables defined in memory')
if model.repo.are_tables_created():
metadata.create_all()
if not harvest_source_table.exists():
# Create each table individually rather than
# using metadata.create_all()
harvest_source_table.create()
harvest_job_table.create()
harvest_object_table.create()
harvest_gather_error_table.create()
harvest_object_error_table.create()
log.debug('Harvest tables created')
else:
from ckan.model.meta import engine
log.debug('Harvest tables already exist')
# Check if existing tables need to be updated
inspector = Inspector.from_engine(engine)
columns = inspector.get_columns('harvest_source')
if not 'title' in [column['name'] for column in columns]:
log.debug('Harvest tables need to be updated')
migrate_v2()
else:
log.debug('Harvest table creation deferred')
@ -46,20 +76,20 @@ class HarvestDomainObject(DomainObject):
key_attr = 'id'
@classmethod
def get(self, key, default=None, attr=None):
def get(cls, key, default=None, attr=None):
'''Finds a single entity in the register.'''
if attr == None:
attr = self.key_attr
attr = cls.key_attr
kwds = {attr: key}
o = self.filter(**kwds).first()
o = cls.filter(**kwds).first()
if o:
return o
else:
return default
@classmethod
def filter(self, **kwds):
query = Session.query(self).autoflush(False)
def filter(cls, **kwds):
query = Session.query(cls).autoflush(False)
return query.filter_by(**kwds)
@ -91,10 +121,6 @@ class HarvestObject(HarvestDomainObject):
'''
@property
def source(self):
return self.job.source
class HarvestGatherError(HarvestDomainObject):
'''Gather errors are raised during the **gather** stage of a harvesting
job.
@ -107,7 +133,19 @@ class HarvestObjectError(HarvestDomainObject):
'''
pass
def create_harvester_tables():
def harvest_object_before_insert_listener(mapper,connection,target):
'''
For compatibility with old harvesters, check if the source id has
been set, and set it automatically from the job if not.
'''
if not target.harvest_source_id or not target.source:
if not target.job:
raise Exception('You must define a Harvest Job for each Harvest Object')
target.source = target.job.source
target.harvest_source_id = target.job.source.id
def define_harvester_tables():
global harvest_source_table
global harvest_job_table
@ -118,9 +156,10 @@ def create_harvester_tables():
harvest_source_table = Table('harvest_source', metadata,
Column('id', types.UnicodeText, primary_key=True, default=make_uuid),
Column('url', types.UnicodeText, nullable=False),
Column('title', types.UnicodeText, default=u''),
Column('description', types.UnicodeText, default=u''),
Column('config', types.UnicodeText, default=u''),
Column('created', DateTime, default=datetime.datetime.utcnow),
Column('created', types.DateTime, default=datetime.datetime.utcnow),
Column('type',types.UnicodeText,nullable=False),
Column('active',types.Boolean,default=True),
Column('user_id', types.UnicodeText, default=u''),
@ -129,9 +168,9 @@ def create_harvester_tables():
# Was harvesting_job
harvest_job_table = Table('harvest_job', metadata,
Column('id', types.UnicodeText, primary_key=True, default=make_uuid),
Column('created', DateTime, default=datetime.datetime.utcnow),
Column('gather_started', DateTime),
Column('gather_finished', DateTime),
Column('created', types.DateTime, default=datetime.datetime.utcnow),
Column('gather_started', types.DateTime),
Column('gather_finished', types.DateTime),
Column('source_id', types.UnicodeText, ForeignKey('harvest_source.id')),
Column('status', types.UnicodeText, default=u'New', nullable=False),
)
@ -139,13 +178,15 @@ def create_harvester_tables():
harvest_object_table = Table('harvest_object', metadata,
Column('id', types.UnicodeText, primary_key=True, default=make_uuid),
Column('guid', types.UnicodeText, default=u''),
Column('gathered', DateTime, default=datetime.datetime.utcnow),
Column('fetch_started', DateTime),
Column('current',types.Boolean,default=False),
Column('gathered', types.DateTime, default=datetime.datetime.utcnow),
Column('fetch_started', types.DateTime),
Column('content', types.UnicodeText, nullable=True),
Column('fetch_finished', DateTime),
Column('metadata_modified_date', DateTime),
Column('fetch_finished', types.DateTime),
Column('metadata_modified_date', types.DateTime),
Column('retry_times',types.Integer),
Column('harvest_job_id', types.UnicodeText, ForeignKey('harvest_job.id')),
Column('harvest_source_id', types.UnicodeText, ForeignKey('harvest_source.id')),
Column('package_id', types.UnicodeText, ForeignKey('package.id'), nullable=True),
)
# New table
@ -153,7 +194,7 @@ def create_harvester_tables():
Column('id', types.UnicodeText, primary_key=True, default=make_uuid),
Column('harvest_job_id', types.UnicodeText, ForeignKey('harvest_job.id')),
Column('message', types.UnicodeText),
Column('created', DateTime, default=datetime.datetime.utcnow),
Column('created', types.DateTime, default=datetime.datetime.utcnow),
)
# New table
harvest_object_error_table = Table('harvest_object_error',metadata,
@ -161,7 +202,7 @@ def create_harvester_tables():
Column('harvest_object_id', types.UnicodeText, ForeignKey('harvest_object.id')),
Column('message',types.UnicodeText),
Column('stage', types.UnicodeText),
Column('created', DateTime, default=datetime.datetime.utcnow),
Column('created', types.DateTime, default=datetime.datetime.utcnow),
)
mapper(
@ -196,6 +237,12 @@ def create_harvester_tables():
lazy=True,
backref=u'objects',
),
'source': relation(
HarvestSource,
lazy=True,
backref=u'objects',
),
},
)
@ -220,3 +267,46 @@ def create_harvester_tables():
),
},
)
event.listen(HarvestObject, 'before_insert', harvest_object_before_insert_listener)
def migrate_v2():
log.debug('Migrating harvest tables to v2. This may take a while...')
conn = Session.connection()
statements = '''
ALTER TABLE harvest_source ADD COLUMN title text;
ALTER TABLE harvest_object ADD COLUMN current boolean;
ALTER TABLE harvest_object ADD COLUMN harvest_source_id text;
ALTER TABLE harvest_object ADD CONSTRAINT harvest_object_harvest_source_id_fkey FOREIGN KEY (harvest_source_id) REFERENCES harvest_source(id);
UPDATE harvest_object o SET harvest_source_id = j.source_id FROM harvest_job j WHERE o.harvest_job_id = j.id;
'''
conn.execute(statements)
# Flag current harvest_objects
guids = Session.query(distinct(HarvestObject.guid)) \
.join(Package) \
.filter(HarvestObject.package!=None) \
.filter(Package.state==u'active')
update_statement = '''
UPDATE harvest_object
SET current = TRUE
WHERE id = (
SELECT o.id
FROM harvest_object o JOIN package p ON p.id = o.package_id
WHERE o.package_id IS NOT null AND p.state = 'active'
AND o.guid = '%s'
ORDER BY metadata_modified_date DESC, fetch_finished DESC, gathered DESC
LIMIT 1)
'''
for guid in guids:
conn.execute(update_statement % guid)
conn.execute('UPDATE harvest_object SET current = FALSE WHERE current IS NOT TRUE')
Session.commit()
log.info('Harvest tables migrated to v2')

View File

@ -1,6 +1,7 @@
import os
from logging import getLogger
from pylons import config
from genshi.input import HTML
from genshi.filters import Transformer
@ -8,8 +9,8 @@ import ckan.lib.helpers as h
from ckan.plugins import implements, SingletonPlugin
from ckan.plugins import IRoutes, IConfigurer
from ckan.plugins import IConfigurable, IGenshiStreamFilter
from ckanext.harvest.model import setup
from ckan.plugins import IConfigurable, IActions, IAuthFunctions
from ckanext.harvest.model import setup as model_setup
log = getLogger(__name__)
@ -18,9 +19,32 @@ class Harvest(SingletonPlugin):
implements(IConfigurable)
implements(IRoutes, inherit=True)
implements(IConfigurer, inherit=True)
implements(IActions)
implements(IAuthFunctions)
def configure(self, config):
setup()
auth_profile = config.get('ckan.harvest.auth.profile',None)
if auth_profile:
# Check if auth profile exists
module_root = 'ckanext.harvest.logic.auth'
module_path = '%s.%s' % (module_root, auth_profile)
try:
module = __import__(module_path)
except ImportError,e:
raise ImportError('Unknown auth profile: %s' % auth_profile)
# If we are using the publisher auth profile, make sure CKAN core
# also uses it.
if auth_profile == 'publisher' and \
not config.get('ckan.auth.profile','') == 'publisher':
raise Exception('You must enable the "publisher" auth profile'
+' in CKAN in order to use it on the harvest extension'
+' (adding "ckan.auth.profile=publisher" to your ini file)')
# Setup harvest model
model_setup()
def before_map(self, map):
@ -51,3 +75,71 @@ class Harvest(SingletonPlugin):
config['extra_public_paths'] += ',' + public_dir
else:
config['extra_public_paths'] = public_dir
def get_actions(self):
from ckanext.harvest.logic.action.get import (harvest_source_show,
harvest_source_list,
harvest_job_show,
harvest_job_list,
harvest_object_show,
harvest_object_list,
harvesters_info_show,)
from ckanext.harvest.logic.action.create import (harvest_source_create,
harvest_job_create,
harvest_job_create_all,)
from ckanext.harvest.logic.action.update import (harvest_source_update,
harvest_objects_import,
harvest_jobs_run)
from ckanext.harvest.logic.action.delete import (harvest_source_delete,)
return {
'harvest_source_show': harvest_source_show,
'harvest_source_list': harvest_source_list,
'harvest_job_show': harvest_job_show,
'harvest_job_list': harvest_job_list,
'harvest_object_show': harvest_object_show,
'harvest_object_list': harvest_object_list,
'harvesters_info_show': harvesters_info_show,
'harvest_source_create': harvest_source_create,
'harvest_job_create': harvest_job_create,
'harvest_job_create_all': harvest_job_create_all,
'harvest_source_update': harvest_source_update,
'harvest_source_delete': harvest_source_delete,
'harvest_objects_import': harvest_objects_import,
'harvest_jobs_run':harvest_jobs_run
}
def get_auth_functions(self):
module_root = 'ckanext.harvest.logic.auth'
auth_profile = config.get('ckan.harvest.auth.profile', '')
auth_functions = _get_auth_functions(module_root)
if auth_profile:
module_root = '%s.%s' % (module_root, auth_profile)
auth_functions = _get_auth_functions(module_root,auth_functions)
log.info('Using auth profile at %s' % module_root)
return auth_functions
def _get_auth_functions(module_root, auth_functions = {}):
for auth_module_name in ['get', 'create', 'update','delete']:
module_path = '%s.%s' % (module_root, auth_module_name,)
try:
module = __import__(module_path)
except ImportError,e:
log.debug('No auth module for action "%s"' % auth_module_name)
continue
for part in module_path.split('.')[1:]:
module = getattr(module, part)
for key, value in module.__dict__.items():
if not key.startswith('_'):
auth_functions[key] = value
return auth_functions

View File

@ -46,6 +46,11 @@ body.index.ViewController #content {
color: red;
}
#harvest-sources td{
background-color: white !important;
border-bottom: 1px solid #E3E3E3;
}
.harvester-title{
font-weight: bold;
}
@ -58,3 +63,19 @@ body.index.ViewController #content {
vertical-align: middle;
margin: 0 5px;
}
.source-state-active{
font-weight:bold;
}
.source-state-inactive{
font-weight:bold;
color: red;
}
#harvest-sources .publisher > td{
background-color: #E3E3E3 !important;
padding: 3px;
font-weight: bold;
}

View File

@ -5,6 +5,8 @@
<py:def function="page_title">Harvesting Sources</py:def>
<py:def function="body_class">harvest</py:def>
<py:def function="optional_head">
<link type="text/css" rel="stylesheet" media="all" href="/ckanext/harvest/style.css" />
</py:def>
@ -24,7 +26,7 @@
<label for="show-inactive-sources"> Show inactive sources</label>
</div>
<table id="harvest-sources">
<table id="harvest-sources" class="${'publishers' if c.publisher_auth else ''}" >
<tr>
<th class="action">View</th>
<th class="action">Edit</th>
@ -36,8 +38,21 @@
<th>Next Harvest</th>
<th>Created</th>
</tr>
<?python old_publisher = None ?>
<py:for each="source in c.sources">
<tr class="publisher" py:if="c.publisher_auth and old_publisher != source['publisher_id']">
<py:choose>
<py:when test="source.get('publisher_title')">
<td colspan="9">${source['publisher_title']}</td>
</py:when>
<py:otherwise>
<td colspan="9">${source['publisher_id']}</td>
</py:otherwise>
</py:choose>
<tr py:for="source in c.sources" class="${'active' if source.active else 'inactive'}">
</tr>
<?python old_publisher = source['publisher_id'] ?>
<tr class="${'active' if source.active else 'inactive'}">
<td><a href="harvest/${source.id}"><img src="ckanext/harvest/images/icons/source_view.png" alt="View" title="View" /></a></td>
<td><a href="harvest/edit/${source.id}"><img src="ckanext/harvest/images/icons/source_edit.png" alt="Edit" title="Edit" /></a></td>
<td><a href="harvest/refresh/${source.id}"><img src="ckanext/harvest/images/icons/source_refresh.png" alt="Refresh" title="Refresh" /></a></td>
@ -65,6 +80,7 @@
<td>${h.render_datetime(source.created)}</td>
</tr>
</py:for>
</table>
</py:when>
<py:otherwise>

View File

@ -35,11 +35,44 @@
</py:for>
</ul>
</dd>
<dt class="harvest-source-title"><label class="field_req" for="title">Title</label></dt>
<dd class="harvest-source-title"><input id="title" name="title" size="80" type="text" value="${data.get('title', '')}" /></dd>
<dd class="harvest-source-title field_error" py:if="errors.get('title', '')">${errors.get('title', '')}</dd>
<dd class="harvest-source-title instructions basic">This will be shown as the datasets source.</dd>
<dt><label class="field_opt" for="description">Description</label></dt>
<dd><textarea id="description" name="description" cols="30" rows="2" style="height:75px">${data.get('description', '')}</textarea></dd>
<dd class="instructions basic">You can add your own notes here about what the URL above represents to remind you later.</dd>
<dt><label class="field_opt" for="config">Configuration</label></dt>
<dd><textarea id="config" name="config" cols="30" rows="2" style="height:75px">${data.get('config', '')}</textarea></dd>
<dt py:if="c.publisher_auth"><label class="field_opt" for="groups__${len(data.get('groups', []))}__id">Publisher</label></dt>
<dd py:if="c.publisher_auth and c.groups">
<select id="publisher_id" name="publisher_id">
<py:for each="group in c.groups">
<option value="${group['id']}" py:attrs="{'selected': 'selected' if group['id'] == data.get('publisher_id',None) else None}">${group['title']}</option>
</py:for>
</select>
</dd>
<dd py:if="c.publisher_auth and not c.groups"><em>Cannot add any publishers.</em></dd>
<dt class="harvest-source-config"><label class="field_opt" for="config">Configuration</label></dt>
<dd class="harvest-source-config"><textarea id="config" name="config" cols="30" rows="2" style="height:75px">${data.get('config', '')}</textarea></dd>
<dt><label class="field_opt" for="active">State</label></dt>
<dd>
<select id="active" name="active">
<option py:attrs="{'selected': 'selected' if data.get('active') or not 'active' in data else None}" value="True">active</option>
<option py:attrs="{'selected': 'selected' if 'active' in data and not data.get('active') else None}" value="False">inactive</option>
</select>
<py:if test="data.get('active') or not 'active' in data">
<div>This harvest source is <span class="source-state-active">Active</span></div>
</py:if>
<py:if test="'active' in data and not data.get('active')">
<div>This harvest source is <span class="source-state-inactive">Inactive</span></div>
</py:if>
</dd>
</dl>
</fieldset>
<input id="save" name="save" value="Save" type="submit" /> or <a href="/harvest">Return to the harvest sources list</a>

View File

@ -15,7 +15,10 @@
<h1>Harvest Source Details</h1>
<div id="harvest-source-actions">
<img src="/ckanext/harvest/images/icons/source_edit.png" alt="Edit" /><a href="/harvest/edit/${c.source.id}">Edit source</a> |
<img src="/ckanext/harvest/images/icons/source_refresh.png" alt="Refresh" /><a href="/harvest/refresh/${c.source.id}">Refresh source</a></div>
<img src="/ckanext/harvest/images/icons/source_refresh.png" alt="Refresh" /><a href="/harvest/refresh/${c.source.id}">Refresh source</a> |
<a href="/harvest">Sources list</a>
</div>
<table id="harvest-source-details">
<tr>
<th>ID</th>
@ -33,6 +36,11 @@
<th>Active</th>
<td>${c.source.active}</td>
</tr>
<tr py:if="c.source.title">
<th>Title</th>
<td>${c.source.title}</td>
</tr>
<tr>
<th>Description</th>
<td>${c.source.description}</td>
@ -46,13 +54,18 @@
<td>-</td>
</py:if>
</tr>
<tr>
<tr py:if="c.publisher_auth">
<th>User</th>
<td>${c.source.user_id}</td>
</tr>
<tr>
<tr py:if="c.publisher_auth">
<th>Publisher</th>
<py:if test="c.source.publisher_title">
<td>${c.source.publisher_title}</td>
</py:if>
<py:if test="not c.source.publisher_title">
<td>${c.source.publisher_id}</td>
</py:if>
</tr>
<tr>
<th>Created</th>
@ -60,7 +73,7 @@
</tr>
<tr>
<th>Total jobs</th>
<td>${len(c.source.jobs)}</td>
<td>${c.source.status.job_count}</td>
</tr>
<tr>
<th>Status</th>

View File

@ -0,0 +1,223 @@
import logging
from pprint import pprint
from nose.plugins.skip import SkipTest;
from ckan import model
from ckan.model import Package, Session
from ckan.lib.helpers import url_for,json
from ckan.lib.base import config
from ckan.tests import CreateTestData
from ckan.tests.functional.base import FunctionalTestCase
from ckanext.harvest.plugin import Harvest
from ckanext.harvest.model import HarvestSource, HarvestJob, setup as harvest_model_setup
log = logging.getLogger(__name__)
class HarvestAuthBaseCase():
@classmethod
def setup_class(cls):
harvest_model_setup()
@classmethod
def teardown_class(cls):
pass
def _test_auth_not_allowed(self,user_name = None, source = None, status = 401):
if not source:
# Create harvest source
source = HarvestSource(url=u'http://test-source.com',type='ckan')
Session.add(source)
Session.commit()
if user_name:
extra_environ = {'REMOTE_USER': user_name.encode('utf8')}
else:
extra_environ = {}
# List
res = self.app.get('/harvest', status=status, extra_environ=extra_environ)
# Create
res = self.app.get('/harvest/new', status=status, extra_environ=extra_environ)
# Read
res = self.app.get('/harvest/%s' % source.id, status=status, extra_environ=extra_environ)
# Edit
res = self.app.get('/harvest/edit/%s' % source.id, status=status, extra_environ=extra_environ)
# Refresh
res = self.app.get('/harvest/refresh/%s' % source.id, status=status, extra_environ=extra_environ)
def _test_auth_allowed(self,user_name,auth_profile=None):
extra_environ={'REMOTE_USER': user_name.encode('utf8')}
# List
res = self.app.get('/harvest', extra_environ=extra_environ)
assert 'Harvesting Sources' in res
# Create
res = self.app.get('/harvest/new', extra_environ=extra_environ)
assert 'New harvest source' in res
if auth_profile == 'publisher':
assert 'publisher_id' in res
else:
assert not 'publisher_id' in res
fv = res.forms['source-new']
fv['url'] = u'http://test-source.com'
fv['type'] = u'ckan'
fv['title'] = u'Test harvest source'
fv['description'] = u'Test harvest source'
fv['config'] = u'{"a":1,"b":2}'
if auth_profile == 'publisher':
fv['publisher_id'] = self.publisher1.id
res = fv.submit('save', extra_environ=extra_environ)
assert not 'Error' in res, res
source = Session.query(HarvestSource).first()
assert source.url == u'http://test-source.com'
assert source.type == u'ckan'
# Read
res = self.app.get('/harvest/%s' % source.id, extra_environ=extra_environ)
assert 'Harvest Source Details' in res
assert source.id in res
assert source.title in res
# Edit
res = self.app.get('/harvest/edit/%s' % source.id, extra_environ=extra_environ)
assert 'Edit harvest source' in res
if auth_profile == 'publisher':
assert 'publisher_id' in res
else:
assert not 'publisher_id' in res
fv = res.forms['source-new']
fv['title'] = u'Test harvest source Updated'
res = fv.submit('save', extra_environ=extra_environ)
assert not 'Error' in res, res
source = Session.query(HarvestSource).first()
assert source.title == u'Test harvest source Updated'
# Refresh
res = self.app.get('/harvest/refresh/%s' % source.id, extra_environ=extra_environ)
job = Session.query(HarvestJob).first()
assert job.source_id == source.id
class TestAuthDefaultProfile(FunctionalTestCase,HarvestAuthBaseCase):
@classmethod
def setup_class(cls):
if (config.get('ckan.harvest.auth.profile','') != ''):
raise SkipTest('Skipping default auth profile tests. Set ckan.harvest.auth.profile = \'\' to run them')
super(TestAuthDefaultProfile,cls).setup_class()
def setup(self):
CreateTestData.create()
self.sysadmin_user = model.User.get('testsysadmin')
self.normal_user = model.User.get('annafan')
def teardown(self):
model.repo.rebuild_db()
def test_auth_default_profile_sysadmin(self):
self._test_auth_allowed(self.sysadmin_user.name)
def test_auth_default_profile_normal(self):
self._test_auth_not_allowed(self.normal_user.name)
def test_auth_default_profile_notloggedin(self):
self._test_auth_not_allowed(status=302)
class TestAuthPublisherProfile(FunctionalTestCase,HarvestAuthBaseCase):
@classmethod
def setup_class(cls):
if (config.get('ckan.harvest.auth.profile') != 'publisher'):
raise SkipTest('Skipping publisher auth profile tests. Set ckan.harvest.auth.profile = \'publisher\' to run them')
super(TestAuthPublisherProfile,cls).setup_class()
def setup(self):
model.Session.remove()
CreateTestData.create(auth_profile='publisher')
self.sysadmin_user = model.User.get('testsysadmin')
self.normal_user = model.User.get('annafan') # Does not belong to a publisher
self.publisher1_user = model.User.by_name('russianfan')
self.publisher2_user = model.User.by_name('tester')
# Create two Publishers
rev = model.repo.new_revision()
self.publisher1 = model.Group(name=u'test-publisher1',title=u'Test Publihser 1',type=u'publisher')
Session.add(self.publisher1)
self.publisher2 = model.Group(name=u'test-publisher2',title=u'Test Publihser 2',type=u'publisher')
Session.add(self.publisher2)
member1 = model.Member(table_name = 'user',
table_id = self.publisher1_user.id,
group=self.publisher1,
capacity='admin')
Session.add(member1)
member2 = model.Member(table_name = 'user',
table_id = self.publisher2_user.id,
group=self.publisher2,
capacity='admin')
Session.add(member2)
Session.commit()
def teardown(self):
model.repo.rebuild_db()
def test_auth_publisher_profile_normal(self):
self._test_auth_not_allowed(self.normal_user.name)
def test_auth_publisher_profile_notloggedin(self):
self._test_auth_not_allowed(status=302)
def test_auth_publisher_profile_sysadmin(self):
self._test_auth_allowed(self.sysadmin_user.name,auth_profile='publisher')
def test_auth_publisher_profile_publisher(self):
self._test_auth_allowed(self.publisher1_user.name,auth_profile='publisher')
def test_auth_publisher_profile_different_publisher(self):
# Create a source for publisher 1
source = HarvestSource(url=u'http://test-source.com',type='ckan',
publisher_id=self.publisher1.id)
Session.add(source)
Session.commit()
extra_environ = {'REMOTE_USER': self.publisher2_user.name.encode('utf8')}
# List (Publihsers can see the sources list)
res = self.app.get('/harvest', extra_environ=extra_environ)
assert 'Harvesting Sources' in res
# Create
res = self.app.get('/harvest/new', extra_environ=extra_environ)
assert 'New harvest source' in res
assert 'publisher_id' in res
# Check that this publihser is not allowed to manage sources from other publishers
status = 401
# Read
res = self.app.get('/harvest/%s' % source.id, status=status, extra_environ=extra_environ)
# Edit
res = self.app.get('/harvest/edit/%s' % source.id, status=status, extra_environ=extra_environ)
# Refresh
res = self.app.get('/harvest/refresh/%s' % source.id, status=status, extra_environ=extra_environ)

View File

@ -0,0 +1,40 @@
; ===============================
; ckan harvester example
; ===============================
; symlink or copy this file to /etc/supervisr/conf.d
; change the path/to/virtualenv below to the virtualenv ckan is in.
[program:ckan_gather_consumer]
; Full Path to executable, should be path to virtural environment,
; Full path to config file too.
command=/path/to/pyenv/bin/paster --plugin=ckanext-harvest harvester gather_consumer --config=/path/to/config/std.ini
; user that owns virtual environment.
user=ckan
numprocs=1
stdout_logfile=/var/log/ckan/std/gather_consumer.log
stderr_logfile=/var/log/ckan/std/gather_consumer.log
autostart=true
autorestart=true
startsecs=10
[program:ckan_fetch_consumer]
; Full Path to executable, should be path to virtural environment,
; Full path to config file too.
command=/path/to/pyenv/bin/paster --plugin=ckanext-harvest harvester fetch_consumer --config=/path/to/config/std.ini
; user that owns virtual environment.
user=ckan
numprocs=1
stdout_logfile=/var/log/ckan/std/fetch_consumer.log
stderr_logfile=/var/log/ckan/std/fetch_consumer.log
autostart=true
autorestart=true
startsecs=10

54
test-core.ini Normal file
View File

@ -0,0 +1,54 @@
[DEFAULT]
debug = true
# Uncomment and replace with the address which should receive any error reports
#email_to = you@yourdomain.com
smtp_server = localhost
error_email_from = paste@localhost
[server:main]
use = egg:Paste#http
host = 0.0.0.0
port = 5000
[app:main]
use = config:../ckan/test-core.ini
# Here we hard-code the database and a flag to make default tests
# run fast.
ckan.plugins = harvest ckan_harvester
# NB: other test configuration should go in test-core.ini, which is
# what the postgres tests use.
# Logging configuration
[loggers]
keys = root, ckan, sqlalchemy
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = WARN
handlers = console
[logger_ckan]
qualname = ckan
handlers =
level = INFO
[logger_sqlalchemy]
handlers =
qualname = sqlalchemy.engine
level = WARN
[handler_console]
class = StreamHandler
args = (sys.stdout,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(asctime)s %(levelname)-5.5s [%(name)s] %(message)s

54
test.ini Normal file
View File

@ -0,0 +1,54 @@
[DEFAULT]
debug = true
# Uncomment and replace with the address which should receive any error reports
#email_to = you@yourdomain.com
smtp_server = localhost
error_email_from = paste@localhost
[server:main]
use = egg:Paste#http
host = 0.0.0.0
port = 5000
[app:main]
use = config:../ckan/test.ini
# Here we hard-code the database and a flag to make default tests
# run fast.
ckan.plugins = harvest ckan_harvester
# NB: other test configuration should go in test-core.ini, which is
# what the postgres tests use.
# Logging configuration
[loggers]
keys = root, ckan, sqlalchemy
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = WARN
handlers = console
[logger_ckan]
qualname = ckan
handlers =
level = INFO
[logger_sqlalchemy]
handlers =
qualname = sqlalchemy.engine
level = WARN
[handler_console]
class = StreamHandler
args = (sys.stdout,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(asctime)s %(levelname)-5.5s [%(name)s] %(message)s