Merge branch 'master' of github.com:ckan/ckanext-harvest into 157-version-three-apify
Conflicts: README.rst
This commit is contained in:
commit
14f372aec6
121
README.rst
121
README.rst
|
@ -36,25 +36,27 @@ running a version lower than 2.0.
|
|||
|
||||
ckan.harvest.mq.type = amqp
|
||||
|
||||
2. Activate your CKAN virtual environment, for example::
|
||||
|
||||
2. Install the extension into your python environment::
|
||||
$ . /usr/lib/ckan/default/bin/activate
|
||||
|
||||
3. Install the ckanext-harvest Python package into your virtual environment::
|
||||
|
||||
(pyenv) $ pip install -e git+https://github.com/ckan/ckanext-harvest.git#egg=ckanext-harvest
|
||||
|
||||
3. Install the rest of python modules required by the extension::
|
||||
4. Install the python modules required by the extension::
|
||||
|
||||
(pyenv) $ pip install -r pip-requirements.txt
|
||||
|
||||
4. Make sure the CKAN configuration ini file contains the harvest main plugin, as
|
||||
5. Make sure the CKAN configuration ini file contains the harvest main plugin, as
|
||||
well as the harvester for CKAN instances if you need it (included with the extension)::
|
||||
|
||||
ckan.plugins = harvest ckan_harvester
|
||||
ckan.plugins = harvest ckan_harvester
|
||||
|
||||
5. If you haven't done it yet on the previous step, define the backend that you
|
||||
are using with the ``ckan.harvest.mq.type`` option (it defaults to
|
||||
``rabbitmq``)::
|
||||
6. If you haven't done it yet on the previous step, define the backend that you
|
||||
are using with the ``ckan.harvest.mq.type`` option (it defaults to ``amqp``)::
|
||||
|
||||
ckan.harvest.mq.type = redis
|
||||
ckan.harvest.mq.type = redis
|
||||
|
||||
There are a number of configuration options available for the backends. These don't need to
|
||||
be modified at all if you are using the default Redis or RabbitMQ install (step 1). The list
|
||||
|
@ -102,24 +104,44 @@ The following operations can be run from the command line using the
|
|||
harvester source {name} {url} {type} [{title}] [{active}] [{owner_org}] [{frequency}] [{config}]
|
||||
- create new harvest source
|
||||
|
||||
harvester rmsource {id}
|
||||
- remove (deactivate) a harvester source, whilst leaving any related datasets, jobs and objects
|
||||
harvester source {source-id/name}
|
||||
- shows a harvest source
|
||||
|
||||
harvester clearsource {id}
|
||||
- clears all datasets, jobs and objects related to a harvest source, but keeps the source itself
|
||||
harvester rmsource {source-id/name}
|
||||
- remove (deactivate) a harvester source, whilst leaving any related
|
||||
datasets, jobs and objects
|
||||
|
||||
harvester clearsource {source-id/name}
|
||||
- clears all datasets, jobs and objects related to a harvest source,
|
||||
but keeps the source itself
|
||||
|
||||
harvester sources [all]
|
||||
- lists harvest sources
|
||||
If 'all' is defined, it also shows the Inactive sources
|
||||
|
||||
harvester job {source-id}
|
||||
harvester job {source-id/name}
|
||||
- create new harvest job
|
||||
|
||||
harvester jobs
|
||||
- lists harvest jobs
|
||||
|
||||
harvester job_abort {source-id/name}
|
||||
- marks a job as "Aborted" so that the source can be restarted afresh.
|
||||
It ensures that the job's harvest objects status are also marked
|
||||
finished. You should ensure that neither the job nor its objects are
|
||||
currently in the gather/fetch queues.
|
||||
|
||||
harvester run
|
||||
- runs harvest jobs
|
||||
- starts any harvest jobs that have been created by putting them onto
|
||||
the gather queue. Also checks running jobs and if finished, it
|
||||
changes their status to Finished.
|
||||
|
||||
harvester run_test {source-id/name}
|
||||
- runs a harvest - for testing only.
|
||||
This does all the stages of the harvest (creates job, gather, fetch,
|
||||
import) without involving the web UI or the queue backends. This is
|
||||
useful for testing a harvester without having to fire up
|
||||
gather/fetch_consumer processes, as is done in production.
|
||||
|
||||
harvester gather_consumer
|
||||
- starts the consumer for the gathering queue
|
||||
|
@ -129,14 +151,22 @@ The following operations can be run from the command line using the
|
|||
|
||||
harvester purge_queues
|
||||
- removes all jobs from fetch and gather queue
|
||||
WARNING: if using Redis, this command purges all data in the current
|
||||
Redis database
|
||||
|
||||
harvester [-j] [--segments={segments}] import [{source-id}]
|
||||
- perform the import stage with the last fetched objects, optionally belonging to a certain source.
|
||||
Please note that no objects will be fetched from the remote server. It will only affect
|
||||
the last fetched objects already present in the database.
|
||||
harvester [-j] [-o] [--segments={segments}] import [{source-id}]
|
||||
- perform the import stage with the last fetched objects, for a certain
|
||||
source or a single harvest object. Please note that no objects will
|
||||
be fetched from the remote server. It will only affect the objects
|
||||
already present in the database.
|
||||
|
||||
If the -j flag is provided, the objects are not joined to existing datasets. This may be useful
|
||||
when importing objects for the first time.
|
||||
To import a particular harvest source, specify its id as an argument.
|
||||
To import a particular harvest object use the -o option.
|
||||
To import a particular package use the -p option.
|
||||
|
||||
You will need to specify the -j flag in cases where the datasets are
|
||||
not yet created (e.g. first harvest, or all previous harvests have
|
||||
failed)
|
||||
|
||||
The --segments flag allows to define a string containing hex digits that represent which of
|
||||
the 16 harvest object segments to import. e.g. 15af will run segments 1,5,a,f
|
||||
|
@ -429,14 +459,43 @@ Here you can also find other examples of custom harvesters:
|
|||
* https://github.com/ckan/ckanext-dcat/tree/master/ckanext/dcat/harvesters
|
||||
* https://github.com/ckan/ckanext-spatial/tree/master/ckanext/spatial/harvesters
|
||||
|
||||
|
||||
Running the harvest jobs
|
||||
========================
|
||||
|
||||
The harvesting extension uses two different queues, one that handles the
|
||||
gathering and another one that handles the fetching and importing. To start
|
||||
the consumers run the following command
|
||||
(make sure you have your python environment activated)::
|
||||
There are two ways to run a harvest::
|
||||
|
||||
1. ``harvester run_test`` for the command-line, suitable for testing
|
||||
2. ``harvester run`` used by the Web UI and scheduled runs
|
||||
|
||||
harvester run_test
|
||||
------------------
|
||||
|
||||
You can run a harvester simply using the ``run_test`` command. This is handy
|
||||
for running a harvest with one command in the console and see all the output
|
||||
in-line. It runs the gather, fetch and import stages all in the same process.
|
||||
|
||||
This is useful for developing a harvester because you can insert break-points
|
||||
in your harvester, and rerun a harvest without having to restart the
|
||||
gather_consumer and fetch_consumer processes each time. In addition, because it
|
||||
doesn't use the queue backends it doesn't interfere with harvests of other
|
||||
sources that may be going on in the background.
|
||||
|
||||
However running this way, if gather_stage, fetch_stage or import_stage raise an
|
||||
exception, they are not caught, whereas with ``harvester run`` they are handled
|
||||
slightly differently as they are called by queue.py. So when testing this
|
||||
aspect its best to use ``harvester run``.
|
||||
|
||||
harvester run
|
||||
-------------
|
||||
|
||||
When a harvest job is started by a user in the Web UI, or by a scheduled
|
||||
harvest, the harvest is started by the ``harvester run`` command. This is the
|
||||
normal method in production systems and scales well.
|
||||
|
||||
In this case, the harvesting extension uses two different queues: one that
|
||||
handles the gathering and another one that handles the fetching and importing.
|
||||
To start the consumers run the following command (make sure you have your
|
||||
python environment activated)::
|
||||
|
||||
paster --plugin=ckanext-harvest harvester gather_consumer --config=mysite.ini
|
||||
|
||||
|
@ -453,9 +512,19 @@ The ``run`` command not only starts any pending harvesting jobs, but also
|
|||
flags those that are finished, allowing new jobs to be created on that particular
|
||||
source and refreshing the source statistics. That means that you will need to run
|
||||
this command before being able to create a new job on a source that was being
|
||||
harvested (On a production site you will typically have a cron job that runs the
|
||||
harvested. (On a production site you will typically have a cron job that runs the
|
||||
command regularly, see next section).
|
||||
|
||||
Occasionally you can find a harvesting job is in a "limbo state" where the job
|
||||
has run with errors but the ``harvester run`` command will not mark it as
|
||||
finished, and therefore you cannot run another job. This is due to particular
|
||||
harvester not handling errors correctly e.g. during development. In this
|
||||
circumstance, ensure that the gather & fetch consumers are running and have
|
||||
nothing more to consume, and then run this abort command with the name or id of
|
||||
the harvest source::
|
||||
|
||||
paster --plugin=ckanext-harvest harvester job_abort {source-id/name} --config=mysite.ini
|
||||
|
||||
|
||||
Setting up the harvesters on a production server
|
||||
================================================
|
||||
|
|
|
@ -18,24 +18,44 @@ class Harvester(CkanCommand):
|
|||
harvester source {name} {url} {type} [{title}] [{active}] [{owner_org}] [{frequency}] [{config}]
|
||||
- create new harvest source
|
||||
|
||||
harvester rmsource {id}
|
||||
- remove (deactivate) a harvester source, whilst leaving any related datasets, jobs and objects
|
||||
harvester source {source-id/name}
|
||||
- shows a harvest source
|
||||
|
||||
harvester clearsource {id}
|
||||
- clears all datasets, jobs and objects related to a harvest source, but keeps the source itself
|
||||
harvester rmsource {source-id/name}
|
||||
- remove (deactivate) a harvester source, whilst leaving any related
|
||||
datasets, jobs and objects
|
||||
|
||||
harvester clearsource {source-id/name}
|
||||
- clears all datasets, jobs and objects related to a harvest source,
|
||||
but keeps the source itself
|
||||
|
||||
harvester sources [all]
|
||||
- lists harvest sources
|
||||
If 'all' is defined, it also shows the Inactive sources
|
||||
|
||||
harvester job {source-id}
|
||||
harvester job {source-id/name}
|
||||
- create new harvest job
|
||||
|
||||
harvester jobs
|
||||
- lists harvest jobs
|
||||
|
||||
harvester job_abort {source-id/name}
|
||||
- marks a job as "Aborted" so that the source can be restarted afresh.
|
||||
It ensures that the job's harvest objects status are also marked
|
||||
finished. You should ensure that neither the job nor its objects are
|
||||
currently in the gather/fetch queues.
|
||||
|
||||
harvester run
|
||||
- runs harvest jobs
|
||||
- starts any harvest jobs that have been created by putting them onto
|
||||
the gather queue. Also checks running jobs and if finished, it
|
||||
changes their status to Finished.
|
||||
|
||||
harvester run_test {source-id/name}
|
||||
- runs a harvest - for testing only.
|
||||
This does all the stages of the harvest (creates job, gather, fetch,
|
||||
import) without involving the web UI or the queue backends. This is
|
||||
useful for testing a harvester without having to fire up
|
||||
gather/fetch_consumer processes, as is done in production.
|
||||
|
||||
harvester gather_consumer
|
||||
- starts the consumer for the gathering queue
|
||||
|
@ -54,10 +74,13 @@ class Harvester(CkanCommand):
|
|||
be fetched from the remote server. It will only affect the objects
|
||||
already present in the database.
|
||||
|
||||
To perform it on a particular object use the -o flag.
|
||||
To import a particular harvest source, specify its id as an argument.
|
||||
To import a particular harvest object use the -o option.
|
||||
To import a particular package use the -p option.
|
||||
|
||||
If the -j flag is provided, the objects are not joined to existing datasets. This may be useful
|
||||
when importing objects for the first time.
|
||||
You will need to specify the -j flag in cases where the datasets are
|
||||
not yet created (e.g. first harvest, or all previous harvests have
|
||||
failed)
|
||||
|
||||
The --segments flag allows to define a string containing hex digits that represent which of
|
||||
the 16 harvest object segments to import. e.g. 15af will run segments 1,5,a,f
|
||||
|
@ -115,7 +138,10 @@ class Harvester(CkanCommand):
|
|||
sys.exit(1)
|
||||
cmd = self.args[0]
|
||||
if cmd == 'source':
|
||||
self.create_harvest_source()
|
||||
if len(self.args) > 2:
|
||||
self.create_harvest_source()
|
||||
else:
|
||||
self.show_harvest_source()
|
||||
elif cmd == 'rmsource':
|
||||
self.remove_harvest_source()
|
||||
elif cmd == 'clearsource':
|
||||
|
@ -126,8 +152,12 @@ class Harvester(CkanCommand):
|
|||
self.create_harvest_job()
|
||||
elif cmd == 'jobs':
|
||||
self.list_harvest_jobs()
|
||||
elif cmd == 'job_abort':
|
||||
self.job_abort()
|
||||
elif cmd == 'run':
|
||||
self.run_harvester()
|
||||
elif cmd == 'run_test':
|
||||
self.run_test_harvest()
|
||||
elif cmd == 'gather_consumer':
|
||||
import logging
|
||||
from ckanext.harvest.queue import (get_gather_consumer,
|
||||
|
@ -240,7 +270,8 @@ class Harvester(CkanCommand):
|
|||
|
||||
# Create a harvest job for the new source if not regular job.
|
||||
if not data_dict['frequency']:
|
||||
get_action('harvest_job_create')(context,{'source_id':source['id']})
|
||||
get_action('harvest_job_create')(
|
||||
context, {'source_id': source['id']})
|
||||
print 'A new Harvest Job for this source has also been created'
|
||||
|
||||
except ValidationError,e:
|
||||
|
@ -248,25 +279,44 @@ class Harvester(CkanCommand):
|
|||
print str(e.error_dict)
|
||||
raise e
|
||||
|
||||
def show_harvest_source(self):
|
||||
|
||||
if len(self.args) >= 2:
|
||||
source_id_or_name = unicode(self.args[1])
|
||||
else:
|
||||
print 'Please provide a source name'
|
||||
sys.exit(1)
|
||||
context = {'model': model, 'session': model.Session,
|
||||
'user': self.admin_user['name']}
|
||||
source = get_action('harvest_source_show')(
|
||||
context, {'id': source_id_or_name})
|
||||
self.print_harvest_source(source)
|
||||
|
||||
def remove_harvest_source(self):
|
||||
if len(self.args) >= 2:
|
||||
source_id = unicode(self.args[1])
|
||||
source_id_or_name = unicode(self.args[1])
|
||||
else:
|
||||
print 'Please provide a source id'
|
||||
sys.exit(1)
|
||||
context = {'model': model, 'user': self.admin_user['name'], 'session':model.Session}
|
||||
get_action('harvest_source_delete')(context,{'id':source_id})
|
||||
print 'Removed harvest source: %s' % source_id
|
||||
context = {'model': model, 'session': model.Session,
|
||||
'user': self.admin_user['name']}
|
||||
source = get_action('harvest_source_show')(
|
||||
context, {'id': source_id_or_name})
|
||||
get_action('harvest_source_delete')(context, {'id': source['id']})
|
||||
print 'Removed harvest source: %s' % source_id_or_name
|
||||
|
||||
def clear_harvest_source(self):
|
||||
if len(self.args) >= 2:
|
||||
source_id = unicode(self.args[1])
|
||||
source_id_or_name = unicode(self.args[1])
|
||||
else:
|
||||
print 'Please provide a source id'
|
||||
sys.exit(1)
|
||||
context = {'model': model, 'user': self.admin_user['name'], 'session':model.Session}
|
||||
get_action('harvest_source_clear')(context,{'id':source_id})
|
||||
print 'Cleared harvest source: %s' % source_id
|
||||
context = {'model': model, 'session': model.Session,
|
||||
'user': self.admin_user['name']}
|
||||
source = get_action('harvest_source_show')(
|
||||
context, {'id': source_id_or_name})
|
||||
get_action('harvest_source_clear')(context, {'id': source['id']})
|
||||
print 'Cleared harvest source: %s' % source_id_or_name
|
||||
|
||||
def list_harvest_sources(self):
|
||||
if len(self.args) >= 2 and self.args[1] == 'all':
|
||||
|
@ -283,13 +333,17 @@ class Harvester(CkanCommand):
|
|||
|
||||
def create_harvest_job(self):
|
||||
if len(self.args) >= 2:
|
||||
source_id = unicode(self.args[1])
|
||||
source_id_or_name = unicode(self.args[1])
|
||||
else:
|
||||
print 'Please provide a source id'
|
||||
sys.exit(1)
|
||||
context = {'model': model, 'session': model.Session,
|
||||
'user': self.admin_user['name']}
|
||||
source = get_action('harvest_source_show')(
|
||||
context, {'id': source_id_or_name})
|
||||
|
||||
context = {'model': model,'session':model.Session, 'user': self.admin_user['name']}
|
||||
job = get_action('harvest_job_create')(context,{'source_id':source_id})
|
||||
job = get_action('harvest_job_create')(
|
||||
context, {'source_id': source['id']})
|
||||
|
||||
self.print_harvest_job(job)
|
||||
jobs = get_action('harvest_job_list')(context,{'status':u'New'})
|
||||
|
@ -302,6 +356,23 @@ class Harvester(CkanCommand):
|
|||
self.print_harvest_jobs(jobs)
|
||||
self.print_there_are(what='harvest job', sequence=jobs)
|
||||
|
||||
def job_abort(self):
|
||||
if len(self.args) >= 2:
|
||||
source_id_or_name = unicode(self.args[1])
|
||||
else:
|
||||
print 'Please provide a source id'
|
||||
sys.exit(1)
|
||||
context = {'model': model, 'session': model.Session,
|
||||
'user': self.admin_user['name']}
|
||||
source = get_action('harvest_source_show')(
|
||||
context, {'id': source_id_or_name})
|
||||
|
||||
context = {'model': model, 'user': self.admin_user['name'],
|
||||
'session': model.Session}
|
||||
job = get_action('harvest_job_abort')(context,
|
||||
{'source_id': source['id']})
|
||||
print 'Job status: {0}'.format(job['status'])
|
||||
|
||||
def run_harvester(self):
|
||||
context = {'model': model, 'user': self.admin_user['name'], 'session':model.Session}
|
||||
try:
|
||||
|
@ -309,18 +380,69 @@ class Harvester(CkanCommand):
|
|||
except NoNewHarvestJobError:
|
||||
print 'There are no new harvest jobs to run.'
|
||||
|
||||
def run_test_harvest(self):
|
||||
from ckanext.harvest import queue
|
||||
from ckanext.harvest.tests import lib
|
||||
from ckanext.harvest.logic import HarvestJobExists
|
||||
from ckanext.harvest.model import HarvestJob
|
||||
|
||||
# Determine the source
|
||||
if len(self.args) >= 2:
|
||||
source_id_or_name = unicode(self.args[1])
|
||||
else:
|
||||
print 'Please provide a source id'
|
||||
sys.exit(1)
|
||||
context = {'model': model, 'session': model.Session,
|
||||
'user': self.admin_user['name']}
|
||||
source = get_action('harvest_source_show')(
|
||||
context, {'id': source_id_or_name})
|
||||
|
||||
# Determine the job
|
||||
try:
|
||||
job_dict = get_action('harvest_job_create')(
|
||||
context, {'source_id': source['id']})
|
||||
except HarvestJobExists:
|
||||
running_jobs = get_action('harvest_job_list')(
|
||||
context, {'source_id': source['id'], 'status': 'Running'})
|
||||
if running_jobs:
|
||||
print '\nSource "%s" apparently has a "Running" job:\n%r' \
|
||||
% (source.get('name') or source['id'], running_jobs)
|
||||
resp = raw_input('Abort it? (y/n)')
|
||||
if not resp.lower().startswith('y'):
|
||||
sys.exit(1)
|
||||
job_dict = get_action('harvest_job_abort')(
|
||||
context, {'source_id': source['id']})
|
||||
else:
|
||||
print 'Reusing existing harvest job'
|
||||
jobs = get_action('harvest_job_list')(
|
||||
context, {'source_id': source['id'], 'status': 'New'})
|
||||
assert len(jobs) == 1, \
|
||||
'Multiple "New" jobs for this source! %r' % jobs
|
||||
job_dict = jobs[0]
|
||||
job_obj = HarvestJob.get(job_dict['id'])
|
||||
|
||||
harvester = queue.get_harvester(source['source_type'])
|
||||
assert harvester, \
|
||||
'No harvester found for type: %s' % source['source_type']
|
||||
lib.run_harvest_job(job_obj, harvester)
|
||||
|
||||
def import_stage(self):
|
||||
|
||||
if len(self.args) >= 2:
|
||||
source_id = unicode(self.args[1])
|
||||
source_id_or_name = unicode(self.args[1])
|
||||
context = {'model': model, 'session': model.Session,
|
||||
'user': self.admin_user['name']}
|
||||
source = get_action('harvest_source_show')(
|
||||
context, {'id': source_id_or_name})
|
||||
source_id = source['id']
|
||||
else:
|
||||
source_id = None
|
||||
|
||||
context = {'model': model, 'session':model.Session, 'user': self.admin_user['name'],
|
||||
context = {'model': model, 'session': model.Session,
|
||||
'user': self.admin_user['name'],
|
||||
'join_datasets': not self.options.no_join_datasets,
|
||||
'segments': self.options.segments}
|
||||
|
||||
|
||||
objs_count = get_action('harvest_objects_import')(context,{
|
||||
'source_id': source_id,
|
||||
'harvest_object_id': self.options.harvest_object_id,
|
||||
|
@ -347,9 +469,16 @@ class Harvester(CkanCommand):
|
|||
|
||||
def print_harvest_source(self, source):
|
||||
print 'Source id: %s' % source.get('id')
|
||||
if 'name' in source:
|
||||
# 'name' is only there if the source comes from the Package
|
||||
print ' name: %s' % source.get('name')
|
||||
print ' url: %s' % source.get('url')
|
||||
print ' type: %s' % source.get('type')
|
||||
print ' active: %s' % (source.get('active', source.get('state') == 'active'))
|
||||
# 'type' if source comes from HarvestSource, 'source_type' if it comes
|
||||
# from the Package
|
||||
print ' type: %s' % (source.get('source_type') or
|
||||
source.get('type'))
|
||||
print ' active: %s' % (source.get('active',
|
||||
source.get('state') == 'active'))
|
||||
print 'frequency: %s' % source.get('frequency')
|
||||
print ' jobs: %s' % source.get('status').get('job_count')
|
||||
print ''
|
||||
|
|
|
@ -70,9 +70,9 @@ def harvest_source_create(context,data_dict):
|
|||
return source
|
||||
|
||||
|
||||
def harvest_job_create(context,data_dict):
|
||||
def harvest_job_create(context, data_dict):
|
||||
log.info('Harvest job create: %r', data_dict)
|
||||
check_access('harvest_job_create',context,data_dict)
|
||||
check_access('harvest_job_create', context, data_dict)
|
||||
|
||||
source_id = data_dict['source_id']
|
||||
|
||||
|
@ -84,13 +84,16 @@ def harvest_job_create(context,data_dict):
|
|||
|
||||
# Check if the source is active
|
||||
if not source.active:
|
||||
log.warn('Harvest job cannot be created for inactive source %s', source_id)
|
||||
log.warn('Harvest job cannot be created for inactive source %s',
|
||||
source_id)
|
||||
raise Exception('Can not create jobs on inactive sources')
|
||||
|
||||
# Check if there already is an unrun or currently running job for this source
|
||||
# Check if there already is an unrun or currently running job for this
|
||||
# source
|
||||
exists = _check_for_existing_jobs(context, source_id)
|
||||
if exists:
|
||||
log.warn('There is already an unrun job %r for this source %s', exists, source_id)
|
||||
log.warn('There is already an unrun job %r for this source %s',
|
||||
exists, source_id)
|
||||
raise HarvestJobExists('There already is an unrun job for this source')
|
||||
|
||||
job = HarvestJob()
|
||||
|
@ -98,7 +101,8 @@ def harvest_job_create(context,data_dict):
|
|||
|
||||
job.save()
|
||||
log.info('Harvest job saved %s', job.id)
|
||||
return harvest_job_dictize(job,context)
|
||||
return harvest_job_dictize(job, context)
|
||||
|
||||
|
||||
def harvest_job_create_all(context,data_dict):
|
||||
log.info('Harvest job create all: %r', data_dict)
|
||||
|
|
|
@ -20,7 +20,7 @@ from ckanext.harvest.logic.dictization import (harvest_source_dictize,
|
|||
log = logging.getLogger(__name__)
|
||||
|
||||
@side_effect_free
|
||||
def harvest_source_show(context,data_dict):
|
||||
def harvest_source_show(context, data_dict):
|
||||
'''
|
||||
Returns the metadata of a harvest source
|
||||
|
||||
|
@ -234,6 +234,11 @@ def harvest_job_report(context, data_dict):
|
|||
|
||||
@side_effect_free
|
||||
def harvest_job_list(context,data_dict):
|
||||
'''Returns a list of jobs and details of objects and errors.
|
||||
|
||||
:param status: filter by e.g. "New" or "Finished" jobs
|
||||
:param source_id: filter by a harvest source
|
||||
'''
|
||||
|
||||
check_access('harvest_job_list',context,data_dict)
|
||||
|
||||
|
@ -315,6 +320,7 @@ def harvest_object_list(context,data_dict):
|
|||
|
||||
@side_effect_free
|
||||
def harvesters_info_show(context,data_dict):
|
||||
'''Returns details of the installed harvesters.'''
|
||||
|
||||
check_access('harvesters_info_show',context,data_dict)
|
||||
|
||||
|
|
|
@ -27,7 +27,7 @@ from ckanext.harvest.queue import get_gather_publisher, resubmit_jobs
|
|||
|
||||
from ckanext.harvest.model import HarvestSource, HarvestJob, HarvestObject
|
||||
from ckanext.harvest.logic import HarvestJobExists, NoNewHarvestJobError
|
||||
from ckanext.harvest.logic.schema import harvest_source_show_package_schema
|
||||
from ckanext.harvest.logic.dictization import harvest_job_dictize
|
||||
|
||||
from ckanext.harvest.logic.action.get import harvest_source_show, harvest_job_list, _get_sources_for_user
|
||||
|
||||
|
@ -83,10 +83,12 @@ def harvest_source_update(context,data_dict):
|
|||
|
||||
return source
|
||||
|
||||
def harvest_source_clear(context,data_dict):
|
||||
|
||||
def harvest_source_clear(context, data_dict):
|
||||
'''
|
||||
Clears all datasets, jobs and objects related to a harvest source, but keeps the source itself.
|
||||
This is useful to clean history of long running harvest sources to start again fresh.
|
||||
Clears all datasets, jobs and objects related to a harvest source, but
|
||||
keeps the source itself. This is useful to clean history of long running
|
||||
harvest sources to start again fresh.
|
||||
|
||||
:param id: the id of the harvest source to clear
|
||||
:type id: string
|
||||
|
@ -94,7 +96,7 @@ def harvest_source_clear(context,data_dict):
|
|||
'''
|
||||
check_access('harvest_source_clear',context,data_dict)
|
||||
|
||||
harvest_source_id = data_dict.get('id',None)
|
||||
harvest_source_id = data_dict.get('id', None)
|
||||
|
||||
source = HarvestSource.get(harvest_source_id)
|
||||
if not source:
|
||||
|
@ -182,6 +184,14 @@ def harvest_source_clear(context,data_dict):
|
|||
return {'id': harvest_source_id}
|
||||
|
||||
def harvest_source_index_clear(context,data_dict):
|
||||
'''
|
||||
Clears all datasets, jobs and objects related to a harvest source, but
|
||||
keeps the source itself. This is useful to clean history of long running
|
||||
harvest sources to start again fresh.
|
||||
|
||||
:param id: the id of the harvest source to clear
|
||||
:type id: string
|
||||
'''
|
||||
|
||||
check_access('harvest_source_clear',context,data_dict)
|
||||
harvest_source_id = data_dict.get('id',None)
|
||||
|
@ -208,14 +218,26 @@ def harvest_source_index_clear(context,data_dict):
|
|||
|
||||
return {'id': harvest_source_id}
|
||||
|
||||
def harvest_objects_import(context,data_dict):
|
||||
|
||||
def harvest_objects_import(context, data_dict):
|
||||
'''
|
||||
Reimports the current harvest objects
|
||||
It performs the import stage with the last fetched objects, optionally
|
||||
belonging to a certain source.
|
||||
Please note that no objects will be fetched from the remote server.
|
||||
It will only affect the last fetched objects already present in the
|
||||
database.
|
||||
Reimports the existing harvest objects, specified by either source_id,
|
||||
harvest_object_id or package_id.
|
||||
|
||||
It performs the import stage with the last fetched objects, optionally
|
||||
belonging to a certain source.
|
||||
|
||||
Please note that no objects will be fetched from the remote server.
|
||||
|
||||
It will only affect the last fetched objects already present in the
|
||||
database.
|
||||
|
||||
:param source_id: the id of the harvest source to import
|
||||
:type source_id: string
|
||||
:param harvest_object_id: the id of the harvest object to import
|
||||
:type harvest_object_id: string
|
||||
:param package_id: the id or name of the package to import
|
||||
:type package_id: string
|
||||
'''
|
||||
log.info('Harvest objects import: %r', data_dict)
|
||||
check_access('harvest_objects_import',context,data_dict)
|
||||
|
@ -397,6 +419,64 @@ def harvest_jobs_run(context,data_dict):
|
|||
return sent_jobs
|
||||
|
||||
|
||||
def harvest_job_abort(context, data_dict):
|
||||
'''
|
||||
Aborts a harvest job. Given a harvest source_id, it looks for the latest
|
||||
one and (assuming it not already Finished) marks it as Finished. It also
|
||||
marks any of that source's harvest objects and (if not complete or error)
|
||||
marks them "ERROR", so any left in limbo are cleaned up. Does not actually
|
||||
stop running any queued harvest fetchs/objects.
|
||||
|
||||
:param source_id: the name or id of the harvest source with a job to abort
|
||||
:type source_id: string
|
||||
'''
|
||||
|
||||
check_access('harvest_job_abort', context, data_dict)
|
||||
|
||||
model = context['model']
|
||||
|
||||
source_id = data_dict.get('source_id', None)
|
||||
source = harvest_source_show(context, {'id': source_id})
|
||||
|
||||
# HarvestJob set status to 'Finished'
|
||||
# Don not use harvest_job_list since it can use a lot of memory
|
||||
last_job = model.Session.query(HarvestJob) \
|
||||
.filter_by(source_id=source['id']) \
|
||||
.order_by(HarvestJob.created.desc()).first()
|
||||
if not last_job:
|
||||
raise NotFound('Error: source has no jobs')
|
||||
job = get_action('harvest_job_show')(context,
|
||||
{'id': last_job.id})
|
||||
|
||||
if job['status'] != 'Finished':
|
||||
# i.e. New or Running
|
||||
job_obj = HarvestJob.get(job['id'])
|
||||
job_obj.status = new_status = 'Finished'
|
||||
model.repo.commit_and_remove()
|
||||
log.info('Harvest job changed status from "%s" to "%s"',
|
||||
job['status'], new_status)
|
||||
else:
|
||||
log.info('Harvest job unchanged. Source %s status is: "%s"',
|
||||
job['id'], job['status'])
|
||||
|
||||
# HarvestObjects set to ERROR
|
||||
job_obj = HarvestJob.get(job['id'])
|
||||
objs = job_obj.objects
|
||||
for obj in objs:
|
||||
if obj.state not in ('COMPLETE', 'ERROR'):
|
||||
old_state = obj.state
|
||||
obj.state = 'ERROR'
|
||||
log.info('Harvest object changed state from "%s" to "%s": %s',
|
||||
old_state, obj.state, obj.id)
|
||||
else:
|
||||
log.info('Harvest object not changed from "%s": %s',
|
||||
obj.state, obj.id)
|
||||
model.repo.commit_and_remove()
|
||||
|
||||
job_obj = HarvestJob.get(job['id'])
|
||||
return harvest_job_dictize(job_obj, context)
|
||||
|
||||
|
||||
@logic.side_effect_free
|
||||
def harvest_sources_reindex(context, data_dict):
|
||||
'''
|
||||
|
@ -434,7 +514,8 @@ def harvest_source_reindex(context, data_dict):
|
|||
context.update({'ignore_auth': True})
|
||||
package_dict = logic.get_action('harvest_source_show')(context,
|
||||
{'id': harvest_source_id})
|
||||
log.debug('Updating search index for harvest source {0}'.format(harvest_source_id))
|
||||
log.debug('Updating search index for harvest source: {0}'.format(
|
||||
package_dict.get('name') or harvest_source_id))
|
||||
|
||||
# Remove configuration values
|
||||
new_dict = {}
|
||||
|
|
|
@ -58,6 +58,16 @@ def harvest_jobs_run(context, data_dict):
|
|||
else:
|
||||
return {'success': True}
|
||||
|
||||
|
||||
def harvest_job_abort(context, data_dict):
|
||||
'''
|
||||
Authorization check for aborting a running harvest job
|
||||
|
||||
Same permissions as running one
|
||||
'''
|
||||
return harvest_jobs_run(context, data_dict)
|
||||
|
||||
|
||||
def harvest_sources_reindex(context, data_dict):
|
||||
'''
|
||||
Authorization check for reindexing all harvest sources
|
||||
|
|
|
@ -118,7 +118,13 @@ class HarvestSource(HarvestDomainObject):
|
|||
or "inactive". The harvesting processes are not fired on inactive
|
||||
sources.
|
||||
'''
|
||||
pass
|
||||
def __repr__(self):
|
||||
return '<HarvestSource id=%s title=%s url=%s active=%r>' % \
|
||||
(self.id, self.title, self.url, self.active)
|
||||
|
||||
def __str__(self):
|
||||
return self.__repr__().encode('ascii', 'ignore')
|
||||
|
||||
|
||||
class HarvestJob(HarvestDomainObject):
|
||||
'''A Harvesting Job is performed in two phases. In first place, the
|
||||
|
|
|
@ -14,8 +14,9 @@ from ckanext.harvest.interfaces import IHarvester
|
|||
log = logging.getLogger(__name__)
|
||||
assert not log.disabled
|
||||
|
||||
__all__ = ['get_gather_publisher', 'get_gather_consumer', \
|
||||
'get_fetch_publisher', 'get_fetch_consumer']
|
||||
__all__ = ['get_gather_publisher', 'get_gather_consumer',
|
||||
'get_fetch_publisher', 'get_fetch_consumer',
|
||||
'get_harvester']
|
||||
|
||||
PORT = 5672
|
||||
USERID = 'guest'
|
||||
|
@ -222,42 +223,38 @@ def gather_callback(channel, method, header, body):
|
|||
# Send the harvest job to the plugins that implement
|
||||
# the Harvester interface, only if the source type
|
||||
# matches
|
||||
harvester_found = False
|
||||
for harvester in PluginImplementations(IHarvester):
|
||||
if harvester.info()['name'] == job.source.type:
|
||||
harvester_found = True
|
||||
harvester = get_harvester(job.source.type)
|
||||
|
||||
try:
|
||||
harvest_object_ids = gather_stage(harvester, job)
|
||||
except (Exception, KeyboardInterrupt):
|
||||
channel.basic_ack(method.delivery_tag)
|
||||
raise
|
||||
if harvester:
|
||||
try:
|
||||
harvest_object_ids = gather_stage(harvester, job)
|
||||
except (Exception, KeyboardInterrupt):
|
||||
channel.basic_ack(method.delivery_tag)
|
||||
raise
|
||||
|
||||
if not isinstance(harvest_object_ids, list):
|
||||
log.error('Gather stage failed')
|
||||
publisher.close()
|
||||
channel.basic_ack(method.delivery_tag)
|
||||
return False
|
||||
if not isinstance(harvest_object_ids, list):
|
||||
log.error('Gather stage failed')
|
||||
publisher.close()
|
||||
channel.basic_ack(method.delivery_tag)
|
||||
return False
|
||||
|
||||
if len(harvest_object_ids) == 0:
|
||||
log.info('No harvest objects to fetch')
|
||||
publisher.close()
|
||||
channel.basic_ack(method.delivery_tag)
|
||||
return False
|
||||
if len(harvest_object_ids) == 0:
|
||||
log.info('No harvest objects to fetch')
|
||||
publisher.close()
|
||||
channel.basic_ack(method.delivery_tag)
|
||||
return False
|
||||
|
||||
log.debug('Received from plugin gather_stage: {0} objects (first: {1} last: {2})'.format(
|
||||
len(harvest_object_ids), harvest_object_ids[:1], harvest_object_ids[-1:]))
|
||||
for id in harvest_object_ids:
|
||||
# Send the id to the fetch queue
|
||||
publisher.send({'harvest_object_id':id})
|
||||
log.debug('Sent {0} objects to the fetch queue'.format(len(harvest_object_ids)))
|
||||
log.debug('Received from plugin gather_stage: {0} objects (first: {1} last: {2})'.format(
|
||||
len(harvest_object_ids), harvest_object_ids[:1], harvest_object_ids[-1:]))
|
||||
for id in harvest_object_ids:
|
||||
# Send the id to the fetch queue
|
||||
publisher.send({'harvest_object_id':id})
|
||||
log.debug('Sent {0} objects to the fetch queue'.format(len(harvest_object_ids)))
|
||||
|
||||
if not harvester_found:
|
||||
else:
|
||||
# This can occur if you:
|
||||
# * remove a harvester and it still has sources that are then
|
||||
# refreshed
|
||||
# * add a new harvester and restart CKAN but not the gather
|
||||
# queue.
|
||||
# * remove a harvester and it still has sources that are then refreshed
|
||||
# * add a new harvester and restart CKAN but not the gather queue.
|
||||
msg = 'System error - No harvester could be found for source type %s' % job.source.type
|
||||
err = HarvestGatherError(message=msg,job=job)
|
||||
err.save()
|
||||
|
@ -268,6 +265,12 @@ def gather_callback(channel, method, header, body):
|
|||
channel.basic_ack(method.delivery_tag)
|
||||
|
||||
|
||||
def get_harvester(harvest_source_type):
|
||||
for harvester in PluginImplementations(IHarvester):
|
||||
if harvester.info()['name'] == harvest_source_type:
|
||||
return harvester
|
||||
|
||||
|
||||
def gather_stage(harvester, job):
|
||||
'''Calls the harvester's gather_stage, returning harvest object ids, with
|
||||
some error handling.
|
||||
|
|
Loading…
Reference in New Issue