Merge remote-tracking branch 'origin/master' into immediate-harvest

Conflicts:
	README.rst
	ckanext/harvest/commands/harvester.py
	ckanext/harvest/logic/action/create.py
	ckanext/harvest/logic/action/update.py
	ckanext/harvest/logic/auth/update.py
This commit is contained in:
David Read 2015-11-03 00:40:25 +00:00
commit 8a7bc9e1d8
8 changed files with 405 additions and 101 deletions

View File

@ -31,24 +31,27 @@ Installation
ckan.harvest.mq.type = amqp
2. Activate your CKAN virtual environment, for example::
2. Install the extension into your python environment::
$ . /usr/lib/ckan/default/bin/activate
3. Install the ckanext-harvest Python package into your virtual environment::
(pyenv) $ pip install -e git+https://github.com/ckan/ckanext-harvest.git#egg=ckanext-harvest
3. Install the rest of python modules required by the extension::
4. Install the python modules required by the extension::
(pyenv) $ pip install -r pip-requirements.txt
4. Make sure the CKAN configuration ini file contains the harvest main plugin, as
5. Make sure the CKAN configuration ini file contains the harvest main plugin, as
well as the harvester for CKAN instances if you need it (included with the extension)::
ckan.plugins = harvest ckan_harvester
ckan.plugins = harvest ckan_harvester
5. If you haven't done it yet on the previous step, define the backend that you are using with the ``ckan.harvest.mq.type``
option (it defaults to ``rabbitmq``)::
6. If you haven't done it yet on the previous step, define the backend that you
are using with the ``ckan.harvest.mq.type`` option (it defaults to ``amqp``)::
ckan.harvest.mq.type = redis
ckan.harvest.mq.type = redis
There are a number of configuration options available for the backends. These don't need to
be modified at all if you are using the default Redis or RabbitMQ install (step 1). The list
@ -96,25 +99,44 @@ The following operations can be run from the command line using the
harvester source {name} {url} {type} [{title}] [{active}] [{owner_org}] [{frequency}] [{config}]
- create new harvest source
harvester rmsource {id}
- remove (deactivate) a harvester source, whilst leaving any related datasets, jobs and objects
harvester source {source-id/name}
- shows a harvest source
harvester clearsource {id}
- clears all datasets, jobs and objects related to a harvest source, but keeps the source itself
harvester rmsource {source-id/name}
- remove (deactivate) a harvester source, whilst leaving any related
datasets, jobs and objects
harvester clearsource {source-id/name}
- clears all datasets, jobs and objects related to a harvest source,
but keeps the source itself
harvester sources [all]
- lists harvest sources
If 'all' is defined, it also shows the Inactive sources
harvester job {source-id}
harvester job {source-id/name}
- create new harvest job
harvester jobs
- lists harvest jobs
harvester job_abort {source-id/name}
- marks a job as "Aborted" so that the source can be restarted afresh.
It ensures that the job's harvest objects status are also marked
finished. You should ensure that neither the job nor its objects are
currently in the gather/fetch queues.
harvester run
- runs any scheduled harvest jobs and looks for jobs that can be marked
as finished
- starts any harvest jobs that have been created by putting them onto
the gather queue. Also checks running jobs - if finished it
changes their status to Finished.
harvester run_test {source-id/name}
- runs a harvest - for testing only.
This does all the stages of the harvest (creates job, gather, fetch,
import) without involving the web UI or the queue backends. This is
useful for testing a harvester without having to fire up
gather/fetch_consumer processes, as is done in production.
harvester gather_consumer
- starts the consumer for the gathering queue
@ -124,14 +146,22 @@ The following operations can be run from the command line using the
harvester purge_queues
- removes all jobs from fetch and gather queue
WARNING: if using Redis, this command purges all data in the current
Redis database
harvester [-j] [--segments={segments}] import [{source-id}]
- perform the import stage with the last fetched objects, optionally belonging to a certain source.
Please note that no objects will be fetched from the remote server. It will only affect
the last fetched objects already present in the database.
harvester [-j] [-o] [--segments={segments}] import [{source-id}]
- perform the import stage with the last fetched objects, for a certain
source or a single harvest object. Please note that no objects will
be fetched from the remote server. It will only affect the objects
already present in the database.
If the -j flag is provided, the objects are not joined to existing datasets. This may be useful
when importing objects for the first time.
To import a particular harvest source, specify its id as an argument.
To import a particular harvest object use the -o option.
To import a particular package use the -p option.
You will need to specify the -j flag in cases where the datasets are
not yet created (e.g. first harvest, or all previous harvests have
failed)
The --segments flag allows to define a string containing hex digits that represent which of
the 16 harvest object segments to import. e.g. 15af will run segments 1,5,a,f
@ -424,14 +454,43 @@ Here you can also find other examples of custom harvesters:
* https://github.com/ckan/ckanext-dcat/tree/master/ckanext/dcat/harvesters
* https://github.com/ckan/ckanext-spatial/tree/master/ckanext/spatial/harvesters
Running the harvest jobs
========================
The harvesting extension uses two different queues, one that handles the
gathering and another one that handles the fetching and importing. To start
the consumers run the following command
(make sure you have your python environment activated)::
There are two ways to run a harvest::
1. ``harvester run_test`` for the command-line, suitable for testing
2. ``harvester run`` used by the Web UI and scheduled runs
harvester run_test
------------------
You can run a harvester simply using the ``run_test`` command. This is handy
for running a harvest with one command in the console and see all the output
in-line. It runs the gather, fetch and import stages all in the same process.
This is useful for developing a harvester because you can insert break-points
in your harvester, and rerun a harvest without having to restart the
gather_consumer and fetch_consumer processes each time. In addition, because it
doesn't use the queue backends it doesn't interfere with harvests of other
sources that may be going on in the background.
However running this way, if gather_stage, fetch_stage or import_stage raise an
exception, they are not caught, whereas with ``harvester run`` they are handled
slightly differently as they are called by queue.py. So when testing this
aspect its best to use ``harvester run``.
harvester run
-------------
When a harvest job is started by a user in the Web UI, or by a scheduled
harvest, the harvest is started by the ``harvester run`` command. This is the
normal method in production systems and scales well.
In this case, the harvesting extension uses two different queues: one that
handles the gathering and another one that handles the fetching and importing.
To start the consumers run the following command (make sure you have your
python environment activated)::
paster --plugin=ckanext-harvest harvester gather_consumer --config=mysite.ini
@ -448,9 +507,19 @@ The ``run`` command not only starts any pending harvesting jobs, but also
flags those that are finished, allowing new jobs to be created on that particular
source and refreshing the source statistics. That means that you will need to run
this command before being able to create a new job on a source that was being
harvested (On a production site you will typically have a cron job that runs the
harvested. (On a production site you will typically have a cron job that runs the
command regularly, see next section).
Occasionally you can find a harvesting job is in a "limbo state" where the job
has run with errors but the ``harvester run`` command will not mark it as
finished, and therefore you cannot run another job. This is due to particular
harvester not handling errors correctly e.g. during development. In this
circumstance, ensure that the gather & fetch consumers are running and have
nothing more to consume, and then run this abort command with the name or id of
the harvest source::
paster --plugin=ckanext-harvest harvester job_abort {source-id/name} --config=mysite.ini
Setting up the harvesters on a production server
================================================

View File

@ -17,25 +17,44 @@ class Harvester(CkanCommand):
harvester source {name} {url} {type} [{title}] [{active}] [{owner_org}] [{frequency}] [{config}]
- create new harvest source
harvester rmsource {id}
- remove (deactivate) a harvester source, whilst leaving any related datasets, jobs and objects
harvester source {source-id/name}
- shows a harvest source
harvester clearsource {id}
- clears all datasets, jobs and objects related to a harvest source, but keeps the source itself
harvester rmsource {source-id/name}
- remove (deactivate) a harvester source, whilst leaving any related
datasets, jobs and objects
harvester clearsource {source-id/name}
- clears all datasets, jobs and objects related to a harvest source,
but keeps the source itself
harvester sources [all]
- lists harvest sources
If 'all' is defined, it also shows the Inactive sources
harvester job {source-id}
harvester job {source-id/name}
- create new harvest job and runs it (puts it on the gather queue)
harvester jobs
- lists harvest jobs
harvester job_abort {source-id/name}
- marks a job as "Aborted" so that the source can be restarted afresh.
It ensures that the job's harvest objects status are also marked
finished. You should ensure that neither the job nor its objects are
currently in the gather/fetch queues.
harvester run
- runs any scheduled harvest jobs and looks for jobs that can be marked
as finished
- starts any harvest jobs that have been created by putting them onto
the gather queue. Also checks running jobs - if finished it
changes their status to Finished.
harvester run_test {source-id/name}
- runs a harvest - for testing only.
This does all the stages of the harvest (creates job, gather, fetch,
import) without involving the web UI or the queue backends. This is
useful for testing a harvester without having to fire up
gather/fetch_consumer processes, as is done in production.
harvester gather_consumer
- starts the consumer for the gathering queue
@ -54,10 +73,13 @@ class Harvester(CkanCommand):
be fetched from the remote server. It will only affect the objects
already present in the database.
To perform it on a particular object use the -o flag.
To import a particular harvest source, specify its id as an argument.
To import a particular harvest object use the -o option.
To import a particular package use the -p option.
If the -j flag is provided, the objects are not joined to existing datasets. This may be useful
when importing objects for the first time.
You will need to specify the -j flag in cases where the datasets are
not yet created (e.g. first harvest, or all previous harvests have
failed)
The --segments flag allows to define a string containing hex digits that represent which of
the 16 harvest object segments to import. e.g. 15af will run segments 1,5,a,f
@ -115,7 +137,10 @@ class Harvester(CkanCommand):
sys.exit(1)
cmd = self.args[0]
if cmd == 'source':
self.create_harvest_source()
if len(self.args) > 2:
self.create_harvest_source()
else:
self.show_harvest_source()
elif cmd == 'rmsource':
self.remove_harvest_source()
elif cmd == 'clearsource':
@ -126,8 +151,12 @@ class Harvester(CkanCommand):
self.create_harvest_job()
elif cmd == 'jobs':
self.list_harvest_jobs()
elif cmd == 'job_abort':
self.job_abort()
elif cmd == 'run':
self.run_harvester()
elif cmd == 'run_test':
self.run_test_harvest()
elif cmd == 'gather_consumer':
import logging
from ckanext.harvest.queue import (get_gather_consumer,
@ -249,25 +278,44 @@ class Harvester(CkanCommand):
print str(e.error_dict)
raise e
def show_harvest_source(self):
if len(self.args) >= 2:
source_id_or_name = unicode(self.args[1])
else:
print 'Please provide a source name'
sys.exit(1)
context = {'model': model, 'session': model.Session,
'user': self.admin_user['name']}
source = get_action('harvest_source_show')(
context, {'id': source_id_or_name})
self.print_harvest_source(source)
def remove_harvest_source(self):
if len(self.args) >= 2:
source_id = unicode(self.args[1])
source_id_or_name = unicode(self.args[1])
else:
print 'Please provide a source id'
sys.exit(1)
context = {'model': model, 'user': self.admin_user['name'], 'session':model.Session}
get_action('harvest_source_delete')(context,{'id':source_id})
print 'Removed harvest source: %s' % source_id
context = {'model': model, 'session': model.Session,
'user': self.admin_user['name']}
source = get_action('harvest_source_show')(
context, {'id': source_id_or_name})
get_action('harvest_source_delete')(context, {'id': source['id']})
print 'Removed harvest source: %s' % source_id_or_name
def clear_harvest_source(self):
if len(self.args) >= 2:
source_id = unicode(self.args[1])
source_id_or_name = unicode(self.args[1])
else:
print 'Please provide a source id'
sys.exit(1)
context = {'model': model, 'user': self.admin_user['name'], 'session':model.Session}
get_action('harvest_source_clear')(context,{'id':source_id})
print 'Cleared harvest source: %s' % source_id
context = {'model': model, 'session': model.Session,
'user': self.admin_user['name']}
source = get_action('harvest_source_show')(
context, {'id': source_id_or_name})
get_action('harvest_source_clear')(context, {'id': source['id']})
print 'Cleared harvest source: %s' % source_id_or_name
def list_harvest_sources(self):
if len(self.args) >= 2 and self.args[1] == 'all':
@ -284,14 +332,18 @@ class Harvester(CkanCommand):
def create_harvest_job(self):
if len(self.args) >= 2:
source_id = unicode(self.args[1])
source_id_or_name = unicode(self.args[1])
else:
print 'Please provide a source id'
sys.exit(1)
context = {'model': model, 'session': model.Session,
'user': self.admin_user['name']}
source = get_action('harvest_source_show')(
context, {'id': source_id_or_name})
context = {'model': model,'session':model.Session, 'user': self.admin_user['name']}
job = get_action('harvest_job_create')(
context, {'source_id': source_id, 'run': True})
context, {'source_id': source['id'], 'run': True})
self.print_harvest_job(job)
jobs = get_action('harvest_job_list')(context,{'status':u'New'})
@ -304,23 +356,91 @@ class Harvester(CkanCommand):
self.print_harvest_jobs(jobs)
self.print_there_are(what='harvest job', sequence=jobs)
def job_abort(self):
if len(self.args) >= 2:
source_id_or_name = unicode(self.args[1])
else:
print 'Please provide a source id'
sys.exit(1)
context = {'model': model, 'session': model.Session,
'user': self.admin_user['name']}
source = get_action('harvest_source_show')(
context, {'id': source_id_or_name})
context = {'model': model, 'user': self.admin_user['name'],
'session': model.Session}
job = get_action('harvest_job_abort')(context,
{'source_id': source['id']})
print 'Job status: {0}'.format(job['status'])
def run_harvester(self):
context = {'model': model, 'user': self.admin_user['name'],
'session': model.Session}
get_action('harvest_jobs_run')(context, {})
def run_test_harvest(self):
from ckanext.harvest import queue
from ckanext.harvest.tests import lib
from ckanext.harvest.logic import HarvestJobExists
from ckanext.harvest.model import HarvestJob
# Determine the source
if len(self.args) >= 2:
source_id_or_name = unicode(self.args[1])
else:
print 'Please provide a source id'
sys.exit(1)
context = {'model': model, 'session': model.Session,
'user': self.admin_user['name']}
source = get_action('harvest_source_show')(
context, {'id': source_id_or_name})
# Determine the job
try:
job_dict = get_action('harvest_job_create')(
context, {'source_id': source['id']})
except HarvestJobExists:
running_jobs = get_action('harvest_job_list')(
context, {'source_id': source['id'], 'status': 'Running'})
if running_jobs:
print '\nSource "%s" apparently has a "Running" job:\n%r' \
% (source.get('name') or source['id'], running_jobs)
resp = raw_input('Abort it? (y/n)')
if not resp.lower().startswith('y'):
sys.exit(1)
job_dict = get_action('harvest_job_abort')(
context, {'source_id': source['id']})
else:
print 'Reusing existing harvest job'
jobs = get_action('harvest_job_list')(
context, {'source_id': source['id'], 'status': 'New'})
assert len(jobs) == 1, \
'Multiple "New" jobs for this source! %r' % jobs
job_dict = jobs[0]
job_obj = HarvestJob.get(job_dict['id'])
harvester = queue.get_harvester(source['source_type'])
assert harvester, \
'No harvester found for type: %s' % source['source_type']
lib.run_harvest_job(job_obj, harvester)
def import_stage(self):
if len(self.args) >= 2:
source_id = unicode(self.args[1])
source_id_or_name = unicode(self.args[1])
context = {'model': model, 'session': model.Session,
'user': self.admin_user['name']}
source = get_action('harvest_source_show')(
context, {'id': source_id_or_name})
source_id = source['id']
else:
source_id = None
context = {'model': model, 'session':model.Session, 'user': self.admin_user['name'],
context = {'model': model, 'session': model.Session,
'user': self.admin_user['name'],
'join_datasets': not self.options.no_join_datasets,
'segments': self.options.segments}
objs_count = get_action('harvest_objects_import')(context,{
'source_id': source_id,
'harvest_object_id': self.options.harvest_object_id,
@ -347,9 +467,16 @@ class Harvester(CkanCommand):
def print_harvest_source(self, source):
print 'Source id: %s' % source.get('id')
if 'name' in source:
# 'name' is only there if the source comes from the Package
print ' name: %s' % source.get('name')
print ' url: %s' % source.get('url')
print ' type: %s' % source.get('type')
print ' active: %s' % (source.get('active', source.get('state') == 'active'))
# 'type' if source comes from HarvestSource, 'source_type' if it comes
# from the Package
print ' type: %s' % (source.get('source_type') or
source.get('type'))
print ' active: %s' % (source.get('active',
source.get('state') == 'active'))
print 'frequency: %s' % source.get('frequency')
print ' jobs: %s' % source.get('status').get('job_count')
print ''

View File

@ -95,13 +95,16 @@ def harvest_job_create(context, data_dict):
# Check if the source is active
if not source.active:
log.warn('Harvest job cannot be created for inactive source %s', source_id)
log.warn('Harvest job cannot be created for inactive source %s',
source_id)
raise Exception('Can not create jobs on inactive sources')
# Check if there already is an unrun or currently running job for this source
# Check if there already is an unrun or currently running job for this
# source
exists = _check_for_existing_jobs(context, source_id)
if exists:
log.warn('There is already an unrun job %r for this source %s', exists, source_id)
log.warn('There is already an unrun job %r for this source %s',
exists, source_id)
raise HarvestJobExists('There already is an unrun job for this source')
job = HarvestJob()

View File

@ -20,7 +20,7 @@ from ckanext.harvest.logic.dictization import (harvest_source_dictize,
log = logging.getLogger(__name__)
@side_effect_free
def harvest_source_show(context,data_dict):
def harvest_source_show(context, data_dict):
'''
Returns the metadata of a harvest source
@ -234,6 +234,11 @@ def harvest_job_report(context, data_dict):
@side_effect_free
def harvest_job_list(context,data_dict):
'''Returns a list of jobs and details of objects and errors.
:param status: filter by e.g. "New" or "Finished" jobs
:param source_id: filter by a harvest source
'''
check_access('harvest_job_list',context,data_dict)
@ -315,6 +320,7 @@ def harvest_object_list(context,data_dict):
@side_effect_free
def harvesters_info_show(context,data_dict):
'''Returns details of the installed harvesters.'''
check_access('harvesters_info_show',context,data_dict)

View File

@ -83,10 +83,12 @@ def harvest_source_update(context,data_dict):
return source
def harvest_source_clear(context,data_dict):
def harvest_source_clear(context, data_dict):
'''
Clears all datasets, jobs and objects related to a harvest source, but keeps the source itself.
This is useful to clean history of long running harvest sources to start again fresh.
Clears all datasets, jobs and objects related to a harvest source, but
keeps the source itself. This is useful to clean history of long running
harvest sources to start again fresh.
:param id: the id of the harvest source to clear
:type id: string
@ -94,7 +96,7 @@ def harvest_source_clear(context,data_dict):
'''
check_access('harvest_source_clear',context,data_dict)
harvest_source_id = data_dict.get('id',None)
harvest_source_id = data_dict.get('id', None)
source = HarvestSource.get(harvest_source_id)
if not source:
@ -182,6 +184,14 @@ def harvest_source_clear(context,data_dict):
return {'id': harvest_source_id}
def harvest_source_index_clear(context,data_dict):
'''
Clears all datasets, jobs and objects related to a harvest source, but
keeps the source itself. This is useful to clean history of long running
harvest sources to start again fresh.
:param id: the id of the harvest source to clear
:type id: string
'''
check_access('harvest_source_clear',context,data_dict)
harvest_source_id = data_dict.get('id',None)
@ -208,14 +218,26 @@ def harvest_source_index_clear(context,data_dict):
return {'id': harvest_source_id}
def harvest_objects_import(context,data_dict):
def harvest_objects_import(context, data_dict):
'''
Reimports the current harvest objects
It performs the import stage with the last fetched objects, optionally
belonging to a certain source.
Please note that no objects will be fetched from the remote server.
It will only affect the last fetched objects already present in the
database.
Reimports the existing harvest objects, specified by either source_id,
harvest_object_id or package_id.
It performs the import stage with the last fetched objects, optionally
belonging to a certain source.
Please note that no objects will be fetched from the remote server.
It will only affect the last fetched objects already present in the
database.
:param source_id: the id of the harvest source to import
:type source_id: string
:param harvest_object_id: the id of the harvest object to import
:type harvest_object_id: string
:param package_id: the id or name of the package to import
:type package_id: string
'''
log.info('Harvest objects import: %r', data_dict)
check_access('harvest_objects_import',context,data_dict)
@ -426,6 +448,64 @@ def harvest_send_job_to_gather_queue(context, data_dict):
return harvest_job_dictize(job_obj, context)
def harvest_job_abort(context, data_dict):
'''
Aborts a harvest job. Given a harvest source_id, it looks for the latest
one and (assuming it not already Finished) marks it as Finished. It also
marks any of that source's harvest objects and (if not complete or error)
marks them "ERROR", so any left in limbo are cleaned up. Does not actually
stop running any queued harvest fetchs/objects.
:param source_id: the name or id of the harvest source with a job to abort
:type source_id: string
'''
check_access('harvest_job_abort', context, data_dict)
model = context['model']
source_id = data_dict.get('source_id', None)
source = harvest_source_show(context, {'id': source_id})
# HarvestJob set status to 'Finished'
# Don not use harvest_job_list since it can use a lot of memory
last_job = model.Session.query(HarvestJob) \
.filter_by(source_id=source['id']) \
.order_by(HarvestJob.created.desc()).first()
if not last_job:
raise NotFound('Error: source has no jobs')
job = get_action('harvest_job_show')(context,
{'id': last_job.id})
if job['status'] != 'Finished':
# i.e. New or Running
job_obj = HarvestJob.get(job['id'])
job_obj.status = new_status = 'Finished'
model.repo.commit_and_remove()
log.info('Harvest job changed status from "%s" to "%s"',
job['status'], new_status)
else:
log.info('Harvest job unchanged. Source %s status is: "%s"',
job['id'], job['status'])
# HarvestObjects set to ERROR
job_obj = HarvestJob.get(job['id'])
objs = job_obj.objects
for obj in objs:
if obj.state not in ('COMPLETE', 'ERROR'):
old_state = obj.state
obj.state = 'ERROR'
log.info('Harvest object changed state from "%s" to "%s": %s',
old_state, obj.state, obj.id)
else:
log.info('Harvest object not changed from "%s": %s',
obj.state, obj.id)
model.repo.commit_and_remove()
job_obj = HarvestJob.get(job['id'])
return harvest_job_dictize(job_obj, context)
@logic.side_effect_free
def harvest_sources_reindex(context, data_dict):
'''
@ -463,7 +543,8 @@ def harvest_source_reindex(context, data_dict):
context.update({'ignore_auth': True})
package_dict = logic.get_action('harvest_source_show')(context,
{'id': harvest_source_id})
log.debug('Updating search index for harvest source {0}'.format(harvest_source_id))
log.debug('Updating search index for harvest source: {0}'.format(
package_dict.get('name') or harvest_source_id))
# Remove configuration values
new_dict = {}

View File

@ -70,6 +70,15 @@ def harvest_send_job_to_gather_queue(context, data_dict):
return harvest_job_create(context, data_dict)
def harvest_job_abort(context, data_dict):
'''
Authorization check for aborting a running harvest job
Same permissions as running one
'''
return harvest_jobs_run(context, data_dict)
def harvest_sources_reindex(context, data_dict):
'''
Authorization check for reindexing all harvest sources

View File

@ -118,7 +118,13 @@ class HarvestSource(HarvestDomainObject):
or "inactive". The harvesting processes are not fired on inactive
sources.
'''
pass
def __repr__(self):
return '<HarvestSource id=%s title=%s url=%s active=%r>' % \
(self.id, self.title, self.url, self.active)
def __str__(self):
return self.__repr__().encode('ascii', 'ignore')
class HarvestJob(HarvestDomainObject):
'''A Harvesting Job is performed in two phases. In first place, the

View File

@ -14,8 +14,9 @@ from ckanext.harvest.interfaces import IHarvester
log = logging.getLogger(__name__)
assert not log.disabled
__all__ = ['get_gather_publisher', 'get_gather_consumer', \
'get_fetch_publisher', 'get_fetch_consumer']
__all__ = ['get_gather_publisher', 'get_gather_consumer',
'get_fetch_publisher', 'get_fetch_consumer',
'get_harvester']
PORT = 5672
USERID = 'guest'
@ -230,42 +231,38 @@ def gather_callback(channel, method, header, body):
# Send the harvest job to the plugins that implement
# the Harvester interface, only if the source type
# matches
harvester_found = False
for harvester in PluginImplementations(IHarvester):
if harvester.info()['name'] == job.source.type:
harvester_found = True
harvester = get_harvester(job.source.type)
try:
harvest_object_ids = gather_stage(harvester, job)
except (Exception, KeyboardInterrupt):
channel.basic_ack(method.delivery_tag)
raise
if harvester:
try:
harvest_object_ids = gather_stage(harvester, job)
except (Exception, KeyboardInterrupt):
channel.basic_ack(method.delivery_tag)
raise
if not isinstance(harvest_object_ids, list):
log.error('Gather stage failed')
publisher.close()
channel.basic_ack(method.delivery_tag)
return False
if not isinstance(harvest_object_ids, list):
log.error('Gather stage failed')
publisher.close()
channel.basic_ack(method.delivery_tag)
return False
if len(harvest_object_ids) == 0:
log.info('No harvest objects to fetch')
publisher.close()
channel.basic_ack(method.delivery_tag)
return False
if len(harvest_object_ids) == 0:
log.info('No harvest objects to fetch')
publisher.close()
channel.basic_ack(method.delivery_tag)
return False
log.debug('Received from plugin gather_stage: {0} objects (first: {1} last: {2})'.format(
len(harvest_object_ids), harvest_object_ids[:1], harvest_object_ids[-1:]))
for id in harvest_object_ids:
# Send the id to the fetch queue
publisher.send({'harvest_object_id':id})
log.debug('Sent {0} objects to the fetch queue'.format(len(harvest_object_ids)))
log.debug('Received from plugin gather_stage: {0} objects (first: {1} last: {2})'.format(
len(harvest_object_ids), harvest_object_ids[:1], harvest_object_ids[-1:]))
for id in harvest_object_ids:
# Send the id to the fetch queue
publisher.send({'harvest_object_id':id})
log.debug('Sent {0} objects to the fetch queue'.format(len(harvest_object_ids)))
if not harvester_found:
else:
# This can occur if you:
# * remove a harvester and it still has sources that are then
# refreshed
# * add a new harvester and restart CKAN but not the gather
# queue.
# * remove a harvester and it still has sources that are then refreshed
# * add a new harvester and restart CKAN but not the gather queue.
msg = 'System error - No harvester could be found for source type %s' % job.source.type
err = HarvestGatherError(message=msg,job=job)
err.save()
@ -276,6 +273,12 @@ def gather_callback(channel, method, header, body):
channel.basic_ack(method.delivery_tag)
def get_harvester(harvest_source_type):
for harvester in PluginImplementations(IHarvester):
if harvester.info()['name'] == harvest_source_type:
return harvester
def gather_stage(harvester, job):
'''Calls the harvester's gather_stage, returning harvest object ids, with
some error handling.