Merge branch 'master' into db-error

Conflicts:
	ckanext/harvest/queue.py
This commit is contained in:
David Read 2015-11-03 00:57:14 +00:00
commit 59be6e2c71
11 changed files with 480 additions and 112 deletions

View File

@ -31,22 +31,25 @@ Installation
ckan.harvest.mq.type = amqp ckan.harvest.mq.type = amqp
2. Activate your CKAN virtual environment, for example::
2. Install the extension into your python environment:: $ . /usr/lib/ckan/default/bin/activate
3. Install the ckanext-harvest Python package into your virtual environment::
(pyenv) $ pip install -e git+https://github.com/ckan/ckanext-harvest.git#egg=ckanext-harvest (pyenv) $ pip install -e git+https://github.com/ckan/ckanext-harvest.git#egg=ckanext-harvest
3. Install the rest of python modules required by the extension:: 4. Install the python modules required by the extension::
(pyenv) $ pip install -r pip-requirements.txt (pyenv) $ pip install -r pip-requirements.txt
4. Make sure the CKAN configuration ini file contains the harvest main plugin, as 5. Make sure the CKAN configuration ini file contains the harvest main plugin, as
well as the harvester for CKAN instances if you need it (included with the extension):: well as the harvester for CKAN instances if you need it (included with the extension)::
ckan.plugins = harvest ckan_harvester ckan.plugins = harvest ckan_harvester
5. If you haven't done it yet on the previous step, define the backend that you are using with the ``ckan.harvest.mq.type`` 6. If you haven't done it yet on the previous step, define the backend that you
option (it defaults to ``rabbitmq``):: are using with the ``ckan.harvest.mq.type`` option (it defaults to ``amqp``)::
ckan.harvest.mq.type = redis ckan.harvest.mq.type = redis
@ -96,24 +99,44 @@ The following operations can be run from the command line using the
harvester source {name} {url} {type} [{title}] [{active}] [{owner_org}] [{frequency}] [{config}] harvester source {name} {url} {type} [{title}] [{active}] [{owner_org}] [{frequency}] [{config}]
- create new harvest source - create new harvest source
harvester rmsource {id} harvester source {source-id/name}
- remove (deactivate) a harvester source, whilst leaving any related datasets, jobs and objects - shows a harvest source
harvester clearsource {id} harvester rmsource {source-id/name}
- clears all datasets, jobs and objects related to a harvest source, but keeps the source itself - remove (deactivate) a harvester source, whilst leaving any related
datasets, jobs and objects
harvester clearsource {source-id/name}
- clears all datasets, jobs and objects related to a harvest source,
but keeps the source itself
harvester sources [all] harvester sources [all]
- lists harvest sources - lists harvest sources
If 'all' is defined, it also shows the Inactive sources If 'all' is defined, it also shows the Inactive sources
harvester job {source-id} harvester job {source-id/name}
- create new harvest job - create new harvest job
harvester jobs harvester jobs
- lists harvest jobs - lists harvest jobs
harvester job_abort {source-id/name}
- marks a job as "Aborted" so that the source can be restarted afresh.
It ensures that the job's harvest objects status are also marked
finished. You should ensure that neither the job nor its objects are
currently in the gather/fetch queues.
harvester run harvester run
- runs harvest jobs - starts any harvest jobs that have been created by putting them onto
the gather queue. Also checks running jobs and if finished, it
changes their status to Finished.
harvester run_test {source-id/name}
- runs a harvest - for testing only.
This does all the stages of the harvest (creates job, gather, fetch,
import) without involving the web UI or the queue backends. This is
useful for testing a harvester without having to fire up
gather/fetch_consumer processes, as is done in production.
harvester gather_consumer harvester gather_consumer
- starts the consumer for the gathering queue - starts the consumer for the gathering queue
@ -123,14 +146,22 @@ The following operations can be run from the command line using the
harvester purge_queues harvester purge_queues
- removes all jobs from fetch and gather queue - removes all jobs from fetch and gather queue
WARNING: if using Redis, this command purges all data in the current
Redis database
harvester [-j] [--segments={segments}] import [{source-id}] harvester [-j] [-o] [--segments={segments}] import [{source-id}]
- perform the import stage with the last fetched objects, optionally belonging to a certain source. - perform the import stage with the last fetched objects, for a certain
Please note that no objects will be fetched from the remote server. It will only affect source or a single harvest object. Please note that no objects will
the last fetched objects already present in the database. be fetched from the remote server. It will only affect the objects
already present in the database.
If the -j flag is provided, the objects are not joined to existing datasets. This may be useful To import a particular harvest source, specify its id as an argument.
when importing objects for the first time. To import a particular harvest object use the -o option.
To import a particular package use the -p option.
You will need to specify the -j flag in cases where the datasets are
not yet created (e.g. first harvest, or all previous harvests have
failed)
The --segments flag allows to define a string containing hex digits that represent which of The --segments flag allows to define a string containing hex digits that represent which of
the 16 harvest object segments to import. e.g. 15af will run segments 1,5,a,f the 16 harvest object segments to import. e.g. 15af will run segments 1,5,a,f
@ -233,6 +264,22 @@ field. The currently supported configuration options are:
lower-case ones, and spaces replaced with dashes. Setting this option to False lower-case ones, and spaces replaced with dashes. Setting this option to False
gives the same effect as leaving it unset. gives the same effect as leaving it unset.
* organizations_filter_include: This configuration option allows you to specify
a list of remote organization names (e.g. "arkansas-gov" is the name for
organization http://catalog.data.gov/organization/arkansas-gov ). If this
property has a value then only datasets that are in one of these organizations
will be harvested. All other datasets will be skipped. Only one of
organizations_filter_include or organizations_filter_exclude should be
configured.
* organizations_filter_exclude: This configuration option allows you to specify
a list of remote organization names (e.g. "arkansas-gov" is the name for
organization http://catalog.data.gov/organization/arkansas-gov ). If this
property is set then all datasets from the remote source will be harvested
unless it belongs to one of the organizations in this option. Only one of
organizations_filter_exclude or organizations_filter_include should be
configured.
Here is an example of a configuration object (the one that must be entered in Here is an example of a configuration object (the one that must be entered in
the configuration field):: the configuration field)::
@ -242,6 +289,8 @@ the configuration field)::
"default_groups":["my-own-group"], "default_groups":["my-own-group"],
"default_extras":{"new_extra":"Test","harvest_url":"{harvest_source_url}/dataset/{dataset_id}"}, "default_extras":{"new_extra":"Test","harvest_url":"{harvest_source_url}/dataset/{dataset_id}"},
"override_extras": true, "override_extras": true,
"organizations_filter_include": [],
"organizations_filter_exclude": ["remote-organization"],
"user":"harverster-user", "user":"harverster-user",
"api_key":"<REMOTE_API_KEY>", "api_key":"<REMOTE_API_KEY>",
"read_only": true, "read_only": true,
@ -405,14 +454,43 @@ Here you can also find other examples of custom harvesters:
* https://github.com/ckan/ckanext-dcat/tree/master/ckanext/dcat/harvesters * https://github.com/ckan/ckanext-dcat/tree/master/ckanext/dcat/harvesters
* https://github.com/ckan/ckanext-spatial/tree/master/ckanext/spatial/harvesters * https://github.com/ckan/ckanext-spatial/tree/master/ckanext/spatial/harvesters
Running the harvest jobs Running the harvest jobs
======================== ========================
The harvesting extension uses two different queues, one that handles the There are two ways to run a harvest::
gathering and another one that handles the fetching and importing. To start
the consumers run the following command 1. ``harvester run_test`` for the command-line, suitable for testing
(make sure you have your python environment activated):: 2. ``harvester run`` used by the Web UI and scheduled runs
harvester run_test
------------------
You can run a harvester simply using the ``run_test`` command. This is handy
for running a harvest with one command in the console and see all the output
in-line. It runs the gather, fetch and import stages all in the same process.
This is useful for developing a harvester because you can insert break-points
in your harvester, and rerun a harvest without having to restart the
gather_consumer and fetch_consumer processes each time. In addition, because it
doesn't use the queue backends it doesn't interfere with harvests of other
sources that may be going on in the background.
However running this way, if gather_stage, fetch_stage or import_stage raise an
exception, they are not caught, whereas with ``harvester run`` they are handled
slightly differently as they are called by queue.py. So when testing this
aspect its best to use ``harvester run``.
harvester run
-------------
When a harvest job is started by a user in the Web UI, or by a scheduled
harvest, the harvest is started by the ``harvester run`` command. This is the
normal method in production systems and scales well.
In this case, the harvesting extension uses two different queues: one that
handles the gathering and another one that handles the fetching and importing.
To start the consumers run the following command (make sure you have your
python environment activated)::
paster --plugin=ckanext-harvest harvester gather_consumer --config=mysite.ini paster --plugin=ckanext-harvest harvester gather_consumer --config=mysite.ini
@ -429,9 +507,19 @@ The ``run`` command not only starts any pending harvesting jobs, but also
flags those that are finished, allowing new jobs to be created on that particular flags those that are finished, allowing new jobs to be created on that particular
source and refreshing the source statistics. That means that you will need to run source and refreshing the source statistics. That means that you will need to run
this command before being able to create a new job on a source that was being this command before being able to create a new job on a source that was being
harvested (On a production site you will typically have a cron job that runs the harvested. (On a production site you will typically have a cron job that runs the
command regularly, see next section). command regularly, see next section).
Occasionally you can find a harvesting job is in a "limbo state" where the job
has run with errors but the ``harvester run`` command will not mark it as
finished, and therefore you cannot run another job. This is due to particular
harvester not handling errors correctly e.g. during development. In this
circumstance, ensure that the gather & fetch consumers are running and have
nothing more to consume, and then run this abort command with the name or id of
the harvest source::
paster --plugin=ckanext-harvest harvester job_abort {source-id/name} --config=mysite.ini
Setting up the harvesters on a production server Setting up the harvesters on a production server
================================================ ================================================

View File

@ -18,24 +18,44 @@ class Harvester(CkanCommand):
harvester source {name} {url} {type} [{title}] [{active}] [{owner_org}] [{frequency}] [{config}] harvester source {name} {url} {type} [{title}] [{active}] [{owner_org}] [{frequency}] [{config}]
- create new harvest source - create new harvest source
harvester rmsource {id} harvester source {source-id/name}
- remove (deactivate) a harvester source, whilst leaving any related datasets, jobs and objects - shows a harvest source
harvester clearsource {id} harvester rmsource {source-id/name}
- clears all datasets, jobs and objects related to a harvest source, but keeps the source itself - remove (deactivate) a harvester source, whilst leaving any related
datasets, jobs and objects
harvester clearsource {source-id/name}
- clears all datasets, jobs and objects related to a harvest source,
but keeps the source itself
harvester sources [all] harvester sources [all]
- lists harvest sources - lists harvest sources
If 'all' is defined, it also shows the Inactive sources If 'all' is defined, it also shows the Inactive sources
harvester job {source-id} harvester job {source-id/name}
- create new harvest job - create new harvest job
harvester jobs harvester jobs
- lists harvest jobs - lists harvest jobs
harvester job_abort {source-id/name}
- marks a job as "Aborted" so that the source can be restarted afresh.
It ensures that the job's harvest objects status are also marked
finished. You should ensure that neither the job nor its objects are
currently in the gather/fetch queues.
harvester run harvester run
- runs harvest jobs - starts any harvest jobs that have been created by putting them onto
the gather queue. Also checks running jobs and if finished, it
changes their status to Finished.
harvester run_test {source-id/name}
- runs a harvest - for testing only.
This does all the stages of the harvest (creates job, gather, fetch,
import) without involving the web UI or the queue backends. This is
useful for testing a harvester without having to fire up
gather/fetch_consumer processes, as is done in production.
harvester gather_consumer harvester gather_consumer
- starts the consumer for the gathering queue - starts the consumer for the gathering queue
@ -54,10 +74,13 @@ class Harvester(CkanCommand):
be fetched from the remote server. It will only affect the objects be fetched from the remote server. It will only affect the objects
already present in the database. already present in the database.
To perform it on a particular object use the -o flag. To import a particular harvest source, specify its id as an argument.
To import a particular harvest object use the -o option.
To import a particular package use the -p option.
If the -j flag is provided, the objects are not joined to existing datasets. This may be useful You will need to specify the -j flag in cases where the datasets are
when importing objects for the first time. not yet created (e.g. first harvest, or all previous harvests have
failed)
The --segments flag allows to define a string containing hex digits that represent which of The --segments flag allows to define a string containing hex digits that represent which of
the 16 harvest object segments to import. e.g. 15af will run segments 1,5,a,f the 16 harvest object segments to import. e.g. 15af will run segments 1,5,a,f
@ -115,7 +138,10 @@ class Harvester(CkanCommand):
sys.exit(1) sys.exit(1)
cmd = self.args[0] cmd = self.args[0]
if cmd == 'source': if cmd == 'source':
if len(self.args) > 2:
self.create_harvest_source() self.create_harvest_source()
else:
self.show_harvest_source()
elif cmd == 'rmsource': elif cmd == 'rmsource':
self.remove_harvest_source() self.remove_harvest_source()
elif cmd == 'clearsource': elif cmd == 'clearsource':
@ -126,8 +152,12 @@ class Harvester(CkanCommand):
self.create_harvest_job() self.create_harvest_job()
elif cmd == 'jobs': elif cmd == 'jobs':
self.list_harvest_jobs() self.list_harvest_jobs()
elif cmd == 'job_abort':
self.job_abort()
elif cmd == 'run': elif cmd == 'run':
self.run_harvester() self.run_harvester()
elif cmd == 'run_test':
self.run_test_harvest()
elif cmd == 'gather_consumer': elif cmd == 'gather_consumer':
import logging import logging
from ckanext.harvest.queue import (get_gather_consumer, from ckanext.harvest.queue import (get_gather_consumer,
@ -240,7 +270,8 @@ class Harvester(CkanCommand):
# Create a harvest job for the new source if not regular job. # Create a harvest job for the new source if not regular job.
if not data_dict['frequency']: if not data_dict['frequency']:
get_action('harvest_job_create')(context,{'source_id':source['id']}) get_action('harvest_job_create')(
context, {'source_id': source['id']})
print 'A new Harvest Job for this source has also been created' print 'A new Harvest Job for this source has also been created'
except ValidationError,e: except ValidationError,e:
@ -248,25 +279,44 @@ class Harvester(CkanCommand):
print str(e.error_dict) print str(e.error_dict)
raise e raise e
def show_harvest_source(self):
if len(self.args) >= 2:
source_id_or_name = unicode(self.args[1])
else:
print 'Please provide a source name'
sys.exit(1)
context = {'model': model, 'session': model.Session,
'user': self.admin_user['name']}
source = get_action('harvest_source_show')(
context, {'id': source_id_or_name})
self.print_harvest_source(source)
def remove_harvest_source(self): def remove_harvest_source(self):
if len(self.args) >= 2: if len(self.args) >= 2:
source_id = unicode(self.args[1]) source_id_or_name = unicode(self.args[1])
else: else:
print 'Please provide a source id' print 'Please provide a source id'
sys.exit(1) sys.exit(1)
context = {'model': model, 'user': self.admin_user['name'], 'session':model.Session} context = {'model': model, 'session': model.Session,
get_action('harvest_source_delete')(context,{'id':source_id}) 'user': self.admin_user['name']}
print 'Removed harvest source: %s' % source_id source = get_action('harvest_source_show')(
context, {'id': source_id_or_name})
get_action('harvest_source_delete')(context, {'id': source['id']})
print 'Removed harvest source: %s' % source_id_or_name
def clear_harvest_source(self): def clear_harvest_source(self):
if len(self.args) >= 2: if len(self.args) >= 2:
source_id = unicode(self.args[1]) source_id_or_name = unicode(self.args[1])
else: else:
print 'Please provide a source id' print 'Please provide a source id'
sys.exit(1) sys.exit(1)
context = {'model': model, 'user': self.admin_user['name'], 'session':model.Session} context = {'model': model, 'session': model.Session,
get_action('harvest_source_clear')(context,{'id':source_id}) 'user': self.admin_user['name']}
print 'Cleared harvest source: %s' % source_id source = get_action('harvest_source_show')(
context, {'id': source_id_or_name})
get_action('harvest_source_clear')(context, {'id': source['id']})
print 'Cleared harvest source: %s' % source_id_or_name
def list_harvest_sources(self): def list_harvest_sources(self):
if len(self.args) >= 2 and self.args[1] == 'all': if len(self.args) >= 2 and self.args[1] == 'all':
@ -283,13 +333,17 @@ class Harvester(CkanCommand):
def create_harvest_job(self): def create_harvest_job(self):
if len(self.args) >= 2: if len(self.args) >= 2:
source_id = unicode(self.args[1]) source_id_or_name = unicode(self.args[1])
else: else:
print 'Please provide a source id' print 'Please provide a source id'
sys.exit(1) sys.exit(1)
context = {'model': model, 'session': model.Session,
'user': self.admin_user['name']}
source = get_action('harvest_source_show')(
context, {'id': source_id_or_name})
context = {'model': model,'session':model.Session, 'user': self.admin_user['name']} job = get_action('harvest_job_create')(
job = get_action('harvest_job_create')(context,{'source_id':source_id}) context, {'source_id': source['id']})
self.print_harvest_job(job) self.print_harvest_job(job)
jobs = get_action('harvest_job_list')(context,{'status':u'New'}) jobs = get_action('harvest_job_list')(context,{'status':u'New'})
@ -302,6 +356,23 @@ class Harvester(CkanCommand):
self.print_harvest_jobs(jobs) self.print_harvest_jobs(jobs)
self.print_there_are(what='harvest job', sequence=jobs) self.print_there_are(what='harvest job', sequence=jobs)
def job_abort(self):
if len(self.args) >= 2:
source_id_or_name = unicode(self.args[1])
else:
print 'Please provide a source id'
sys.exit(1)
context = {'model': model, 'session': model.Session,
'user': self.admin_user['name']}
source = get_action('harvest_source_show')(
context, {'id': source_id_or_name})
context = {'model': model, 'user': self.admin_user['name'],
'session': model.Session}
job = get_action('harvest_job_abort')(context,
{'source_id': source['id']})
print 'Job status: {0}'.format(job['status'])
def run_harvester(self): def run_harvester(self):
context = {'model': model, 'user': self.admin_user['name'], 'session':model.Session} context = {'model': model, 'user': self.admin_user['name'], 'session':model.Session}
try: try:
@ -309,18 +380,69 @@ class Harvester(CkanCommand):
except NoNewHarvestJobError: except NoNewHarvestJobError:
print 'There are no new harvest jobs to run.' print 'There are no new harvest jobs to run.'
def run_test_harvest(self):
from ckanext.harvest import queue
from ckanext.harvest.tests import lib
from ckanext.harvest.logic import HarvestJobExists
from ckanext.harvest.model import HarvestJob
# Determine the source
if len(self.args) >= 2:
source_id_or_name = unicode(self.args[1])
else:
print 'Please provide a source id'
sys.exit(1)
context = {'model': model, 'session': model.Session,
'user': self.admin_user['name']}
source = get_action('harvest_source_show')(
context, {'id': source_id_or_name})
# Determine the job
try:
job_dict = get_action('harvest_job_create')(
context, {'source_id': source['id']})
except HarvestJobExists:
running_jobs = get_action('harvest_job_list')(
context, {'source_id': source['id'], 'status': 'Running'})
if running_jobs:
print '\nSource "%s" apparently has a "Running" job:\n%r' \
% (source.get('name') or source['id'], running_jobs)
resp = raw_input('Abort it? (y/n)')
if not resp.lower().startswith('y'):
sys.exit(1)
job_dict = get_action('harvest_job_abort')(
context, {'source_id': source['id']})
else:
print 'Reusing existing harvest job'
jobs = get_action('harvest_job_list')(
context, {'source_id': source['id'], 'status': 'New'})
assert len(jobs) == 1, \
'Multiple "New" jobs for this source! %r' % jobs
job_dict = jobs[0]
job_obj = HarvestJob.get(job_dict['id'])
harvester = queue.get_harvester(source['source_type'])
assert harvester, \
'No harvester found for type: %s' % source['source_type']
lib.run_harvest_job(job_obj, harvester)
def import_stage(self): def import_stage(self):
if len(self.args) >= 2: if len(self.args) >= 2:
source_id = unicode(self.args[1]) source_id_or_name = unicode(self.args[1])
context = {'model': model, 'session': model.Session,
'user': self.admin_user['name']}
source = get_action('harvest_source_show')(
context, {'id': source_id_or_name})
source_id = source['id']
else: else:
source_id = None source_id = None
context = {'model': model, 'session':model.Session, 'user': self.admin_user['name'], context = {'model': model, 'session': model.Session,
'user': self.admin_user['name'],
'join_datasets': not self.options.no_join_datasets, 'join_datasets': not self.options.no_join_datasets,
'segments': self.options.segments} 'segments': self.options.segments}
objs_count = get_action('harvest_objects_import')(context,{ objs_count = get_action('harvest_objects_import')(context,{
'source_id': source_id, 'source_id': source_id,
'harvest_object_id': self.options.harvest_object_id, 'harvest_object_id': self.options.harvest_object_id,
@ -347,9 +469,16 @@ class Harvester(CkanCommand):
def print_harvest_source(self, source): def print_harvest_source(self, source):
print 'Source id: %s' % source.get('id') print 'Source id: %s' % source.get('id')
if 'name' in source:
# 'name' is only there if the source comes from the Package
print ' name: %s' % source.get('name')
print ' url: %s' % source.get('url') print ' url: %s' % source.get('url')
print ' type: %s' % source.get('type') # 'type' if source comes from HarvestSource, 'source_type' if it comes
print ' active: %s' % (source.get('active', source.get('state') == 'active')) # from the Package
print ' type: %s' % (source.get('source_type') or
source.get('type'))
print ' active: %s' % (source.get('active',
source.get('state') == 'active'))
print 'frequency: %s' % source.get('frequency') print 'frequency: %s' % source.get('frequency')
print ' jobs: %s' % source.get('status').get('job_count') print ' jobs: %s' % source.get('status').get('job_count')
print '' print ''

View File

@ -160,6 +160,26 @@ class CKANHarvester(HarvesterBase):
base_rest_url = base_url + self._get_rest_api_offset() base_rest_url = base_url + self._get_rest_api_offset()
base_search_url = base_url + self._get_search_api_offset() base_search_url = base_url + self._get_search_api_offset()
# Filter in/out datasets from particular organizations
org_filter_include = self.config.get('organizations_filter_include', [])
org_filter_exclude = self.config.get('organizations_filter_exclude', [])
def get_pkg_ids_for_organizations(orgs):
pkg_ids = set()
for organization in orgs:
url = base_search_url + '/dataset?organization=%s' % organization
content = self._get_content(url)
content_json = json.loads(content)
result_count = int(content_json['count'])
pkg_ids |= set(content_json['results'])
while len(pkg_ids) < result_count or not content_json['results']:
url = base_search_url + '/dataset?organization=%s&offset=%s' % (organization, len(pkg_ids))
content = self._get_content(url)
content_json = json.loads(content)
pkg_ids |= set(content_json['results'])
return pkg_ids
include_pkg_ids = get_pkg_ids_for_organizations(org_filter_include)
exclude_pkg_ids = get_pkg_ids_for_organizations(org_filter_exclude)
if (previous_job and not previous_job.gather_errors and not len(previous_job.objects) == 0): if (previous_job and not previous_job.gather_errors and not len(previous_job.objects) == 0):
if not self.config.get('force_all',False): if not self.config.get('force_all',False):
get_all_packages = False get_all_packages = False
@ -182,9 +202,7 @@ class CKANHarvester(HarvesterBase):
continue continue
revision = json.loads(content) revision = json.loads(content)
for package_id in revision['packages']: package_ids = revision['packages']
if not package_id in package_ids:
package_ids.append(package_id)
else: else:
log.info('No packages have been updated on the remote CKAN instance since the last harvest job') log.info('No packages have been updated on the remote CKAN instance since the last harvest job')
return None return None
@ -197,19 +215,22 @@ class CKANHarvester(HarvesterBase):
self._save_gather_error('Unable to get content for URL: %s: %s' % (url, str(e)),harvest_job) self._save_gather_error('Unable to get content for URL: %s: %s' % (url, str(e)),harvest_job)
return None return None
if get_all_packages: if get_all_packages:
# Request all remote packages # Request all remote packages
url = base_rest_url + '/package' url = base_rest_url + '/package'
try: try:
content = self._get_content(url) content = self._get_content(url)
except ContentFetchError,e: except ContentFetchError,e:
self._save_gather_error('Unable to get content for URL: %s: %s' % (url, str(e)),harvest_job) self._save_gather_error('Unable to get content for URL: %s: %s' % (url, str(e)),harvest_job)
return None return None
package_ids = json.loads(content) package_ids = json.loads(content)
if org_filter_include:
package_ids = set(package_ids) & include_pkg_ids
elif org_filter_exclude:
package_ids = set(package_ids) - exclude_pkg_ids
try: try:
object_ids = [] object_ids = []
if len(package_ids): if len(package_ids):

View File

@ -70,9 +70,9 @@ def harvest_source_create(context,data_dict):
return source return source
def harvest_job_create(context,data_dict): def harvest_job_create(context, data_dict):
log.info('Harvest job create: %r', data_dict) log.info('Harvest job create: %r', data_dict)
check_access('harvest_job_create',context,data_dict) check_access('harvest_job_create', context, data_dict)
source_id = data_dict['source_id'] source_id = data_dict['source_id']
@ -84,13 +84,16 @@ def harvest_job_create(context,data_dict):
# Check if the source is active # Check if the source is active
if not source.active: if not source.active:
log.warn('Harvest job cannot be created for inactive source %s', source_id) log.warn('Harvest job cannot be created for inactive source %s',
source_id)
raise Exception('Can not create jobs on inactive sources') raise Exception('Can not create jobs on inactive sources')
# Check if there already is an unrun or currently running job for this source # Check if there already is an unrun or currently running job for this
# source
exists = _check_for_existing_jobs(context, source_id) exists = _check_for_existing_jobs(context, source_id)
if exists: if exists:
log.warn('There is already an unrun job %r for this source %s', exists, source_id) log.warn('There is already an unrun job %r for this source %s',
exists, source_id)
raise HarvestJobExists('There already is an unrun job for this source') raise HarvestJobExists('There already is an unrun job for this source')
job = HarvestJob() job = HarvestJob()
@ -98,7 +101,8 @@ def harvest_job_create(context,data_dict):
job.save() job.save()
log.info('Harvest job saved %s', job.id) log.info('Harvest job saved %s', job.id)
return harvest_job_dictize(job,context) return harvest_job_dictize(job, context)
def harvest_job_create_all(context,data_dict): def harvest_job_create_all(context,data_dict):
log.info('Harvest job create all: %r', data_dict) log.info('Harvest job create all: %r', data_dict)

View File

@ -20,7 +20,7 @@ from ckanext.harvest.logic.dictization import (harvest_source_dictize,
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@side_effect_free @side_effect_free
def harvest_source_show(context,data_dict): def harvest_source_show(context, data_dict):
''' '''
Returns the metadata of a harvest source Returns the metadata of a harvest source
@ -234,6 +234,11 @@ def harvest_job_report(context, data_dict):
@side_effect_free @side_effect_free
def harvest_job_list(context,data_dict): def harvest_job_list(context,data_dict):
'''Returns a list of jobs and details of objects and errors.
:param status: filter by e.g. "New" or "Finished" jobs
:param source_id: filter by a harvest source
'''
check_access('harvest_job_list',context,data_dict) check_access('harvest_job_list',context,data_dict)
@ -315,6 +320,7 @@ def harvest_object_list(context,data_dict):
@side_effect_free @side_effect_free
def harvesters_info_show(context,data_dict): def harvesters_info_show(context,data_dict):
'''Returns details of the installed harvesters.'''
check_access('harvesters_info_show',context,data_dict) check_access('harvesters_info_show',context,data_dict)

View File

@ -27,7 +27,7 @@ from ckanext.harvest.queue import get_gather_publisher, resubmit_jobs
from ckanext.harvest.model import HarvestSource, HarvestJob, HarvestObject from ckanext.harvest.model import HarvestSource, HarvestJob, HarvestObject
from ckanext.harvest.logic import HarvestJobExists, NoNewHarvestJobError from ckanext.harvest.logic import HarvestJobExists, NoNewHarvestJobError
from ckanext.harvest.logic.schema import harvest_source_show_package_schema from ckanext.harvest.logic.dictization import harvest_job_dictize
from ckanext.harvest.logic.action.get import harvest_source_show, harvest_job_list, _get_sources_for_user from ckanext.harvest.logic.action.get import harvest_source_show, harvest_job_list, _get_sources_for_user
@ -83,10 +83,12 @@ def harvest_source_update(context,data_dict):
return source return source
def harvest_source_clear(context,data_dict):
def harvest_source_clear(context, data_dict):
''' '''
Clears all datasets, jobs and objects related to a harvest source, but keeps the source itself. Clears all datasets, jobs and objects related to a harvest source, but
This is useful to clean history of long running harvest sources to start again fresh. keeps the source itself. This is useful to clean history of long running
harvest sources to start again fresh.
:param id: the id of the harvest source to clear :param id: the id of the harvest source to clear
:type id: string :type id: string
@ -94,7 +96,7 @@ def harvest_source_clear(context,data_dict):
''' '''
check_access('harvest_source_clear',context,data_dict) check_access('harvest_source_clear',context,data_dict)
harvest_source_id = data_dict.get('id',None) harvest_source_id = data_dict.get('id', None)
source = HarvestSource.get(harvest_source_id) source = HarvestSource.get(harvest_source_id)
if not source: if not source:
@ -182,6 +184,14 @@ def harvest_source_clear(context,data_dict):
return {'id': harvest_source_id} return {'id': harvest_source_id}
def harvest_source_index_clear(context,data_dict): def harvest_source_index_clear(context,data_dict):
'''
Clears all datasets, jobs and objects related to a harvest source, but
keeps the source itself. This is useful to clean history of long running
harvest sources to start again fresh.
:param id: the id of the harvest source to clear
:type id: string
'''
check_access('harvest_source_clear',context,data_dict) check_access('harvest_source_clear',context,data_dict)
harvest_source_id = data_dict.get('id',None) harvest_source_id = data_dict.get('id',None)
@ -208,14 +218,26 @@ def harvest_source_index_clear(context,data_dict):
return {'id': harvest_source_id} return {'id': harvest_source_id}
def harvest_objects_import(context,data_dict):
def harvest_objects_import(context, data_dict):
''' '''
Reimports the current harvest objects Reimports the existing harvest objects, specified by either source_id,
harvest_object_id or package_id.
It performs the import stage with the last fetched objects, optionally It performs the import stage with the last fetched objects, optionally
belonging to a certain source. belonging to a certain source.
Please note that no objects will be fetched from the remote server. Please note that no objects will be fetched from the remote server.
It will only affect the last fetched objects already present in the It will only affect the last fetched objects already present in the
database. database.
:param source_id: the id of the harvest source to import
:type source_id: string
:param harvest_object_id: the id of the harvest object to import
:type harvest_object_id: string
:param package_id: the id or name of the package to import
:type package_id: string
''' '''
log.info('Harvest objects import: %r', data_dict) log.info('Harvest objects import: %r', data_dict)
check_access('harvest_objects_import',context,data_dict) check_access('harvest_objects_import',context,data_dict)
@ -393,6 +415,64 @@ def harvest_jobs_run(context,data_dict):
return sent_jobs return sent_jobs
def harvest_job_abort(context, data_dict):
'''
Aborts a harvest job. Given a harvest source_id, it looks for the latest
one and (assuming it not already Finished) marks it as Finished. It also
marks any of that source's harvest objects and (if not complete or error)
marks them "ERROR", so any left in limbo are cleaned up. Does not actually
stop running any queued harvest fetchs/objects.
:param source_id: the name or id of the harvest source with a job to abort
:type source_id: string
'''
check_access('harvest_job_abort', context, data_dict)
model = context['model']
source_id = data_dict.get('source_id', None)
source = harvest_source_show(context, {'id': source_id})
# HarvestJob set status to 'Finished'
# Don not use harvest_job_list since it can use a lot of memory
last_job = model.Session.query(HarvestJob) \
.filter_by(source_id=source['id']) \
.order_by(HarvestJob.created.desc()).first()
if not last_job:
raise NotFound('Error: source has no jobs')
job = get_action('harvest_job_show')(context,
{'id': last_job.id})
if job['status'] != 'Finished':
# i.e. New or Running
job_obj = HarvestJob.get(job['id'])
job_obj.status = new_status = 'Finished'
model.repo.commit_and_remove()
log.info('Harvest job changed status from "%s" to "%s"',
job['status'], new_status)
else:
log.info('Harvest job unchanged. Source %s status is: "%s"',
job['id'], job['status'])
# HarvestObjects set to ERROR
job_obj = HarvestJob.get(job['id'])
objs = job_obj.objects
for obj in objs:
if obj.state not in ('COMPLETE', 'ERROR'):
old_state = obj.state
obj.state = 'ERROR'
log.info('Harvest object changed state from "%s" to "%s": %s',
old_state, obj.state, obj.id)
else:
log.info('Harvest object not changed from "%s": %s',
obj.state, obj.id)
model.repo.commit_and_remove()
job_obj = HarvestJob.get(job['id'])
return harvest_job_dictize(job_obj, context)
@logic.side_effect_free @logic.side_effect_free
def harvest_sources_reindex(context, data_dict): def harvest_sources_reindex(context, data_dict):
''' '''
@ -430,7 +510,8 @@ def harvest_source_reindex(context, data_dict):
context.update({'ignore_auth': True}) context.update({'ignore_auth': True})
package_dict = logic.get_action('harvest_source_show')(context, package_dict = logic.get_action('harvest_source_show')(context,
{'id': harvest_source_id}) {'id': harvest_source_id})
log.debug('Updating search index for harvest source {0}'.format(harvest_source_id)) log.debug('Updating search index for harvest source: {0}'.format(
package_dict.get('name') or harvest_source_id))
# Remove configuration values # Remove configuration values
new_dict = {} new_dict = {}

View File

@ -58,6 +58,16 @@ def harvest_jobs_run(context, data_dict):
else: else:
return {'success': True} return {'success': True}
def harvest_job_abort(context, data_dict):
'''
Authorization check for aborting a running harvest job
Same permissions as running one
'''
return harvest_jobs_run(context, data_dict)
def harvest_sources_reindex(context, data_dict): def harvest_sources_reindex(context, data_dict):
''' '''
Authorization check for reindexing all harvest sources Authorization check for reindexing all harvest sources

View File

@ -118,7 +118,13 @@ class HarvestSource(HarvestDomainObject):
or "inactive". The harvesting processes are not fired on inactive or "inactive". The harvesting processes are not fired on inactive
sources. sources.
''' '''
pass def __repr__(self):
return '<HarvestSource id=%s title=%s url=%s active=%r>' % \
(self.id, self.title, self.url, self.active)
def __str__(self):
return self.__repr__().encode('ascii', 'ignore')
class HarvestJob(HarvestDomainObject): class HarvestJob(HarvestDomainObject):
'''A Harvesting Job is performed in two phases. In first place, the '''A Harvesting Job is performed in two phases. In first place, the

View File

@ -16,7 +16,8 @@ log = logging.getLogger(__name__)
assert not log.disabled assert not log.disabled
__all__ = ['get_gather_publisher', 'get_gather_consumer', __all__ = ['get_gather_publisher', 'get_gather_consumer',
'get_fetch_publisher', 'get_fetch_consumer'] 'get_fetch_publisher', 'get_fetch_consumer',
'get_harvester']
PORT = 5672 PORT = 5672
USERID = 'guest' USERID = 'guest'
@ -233,11 +234,9 @@ def gather_callback(channel, method, header, body):
# Send the harvest job to the plugins that implement # Send the harvest job to the plugins that implement
# the Harvester interface, only if the source type # the Harvester interface, only if the source type
# matches # matches
harvester_found = False harvester = get_harvester(job.source.type)
for harvester in PluginImplementations(IHarvester):
if harvester.info()['name'] == job.source.type:
harvester_found = True
if harvester:
try: try:
harvest_object_ids = gather_stage(harvester, job) harvest_object_ids = gather_stage(harvester, job)
except (Exception, KeyboardInterrupt): except (Exception, KeyboardInterrupt):
@ -263,12 +262,10 @@ def gather_callback(channel, method, header, body):
publisher.send({'harvest_object_id':id}) publisher.send({'harvest_object_id':id})
log.debug('Sent {0} objects to the fetch queue'.format(len(harvest_object_ids))) log.debug('Sent {0} objects to the fetch queue'.format(len(harvest_object_ids)))
if not harvester_found: else:
# This can occur if you: # This can occur if you:
# * remove a harvester and it still has sources that are then # * remove a harvester and it still has sources that are then refreshed
# refreshed # * add a new harvester and restart CKAN but not the gather queue.
# * add a new harvester and restart CKAN but not the gather
# queue.
msg = 'System error - No harvester could be found for source type %s' % job.source.type msg = 'System error - No harvester could be found for source type %s' % job.source.type
err = HarvestGatherError(message=msg,job=job) err = HarvestGatherError(message=msg,job=job)
err.save() err.save()
@ -279,6 +276,12 @@ def gather_callback(channel, method, header, body):
channel.basic_ack(method.delivery_tag) channel.basic_ack(method.delivery_tag)
def get_harvester(harvest_source_type):
for harvester in PluginImplementations(IHarvester):
if harvester.info()['name'] == harvest_source_type:
return harvester
def gather_stage(harvester, job): def gather_stage(harvester, job):
'''Calls the harvester's gather_stage, returning harvest object ids, with '''Calls the harvester's gather_stage, returning harvest object ids, with
some error handling. some error handling.

View File

@ -1,7 +1,7 @@
import re import re
from nose.tools import assert_equal from nose.tools import assert_equal
from ckanext.harvest import model as harvest_model
from ckanext.harvest.harvesters.base import HarvesterBase from ckanext.harvest.harvesters.base import HarvesterBase
try: try:
from ckan.tests import helpers from ckan.tests import helpers
@ -18,6 +18,7 @@ class TestGenNewName(object):
@classmethod @classmethod
def setup_class(cls): def setup_class(cls):
helpers.reset_db() helpers.reset_db()
harvest_model.setup()
def test_basic(self): def test_basic(self):
assert_equal(HarvesterBase._gen_new_name('Trees'), 'trees') assert_equal(HarvesterBase._gen_new_name('Trees'), 'trees')
@ -31,6 +32,7 @@ class TestGenNewName(object):
class TestEnsureNameIsUnique(object): class TestEnsureNameIsUnique(object):
def setup(self): def setup(self):
helpers.reset_db() helpers.reset_db()
harvest_model.setup()
def test_no_existing_datasets(self): def test_no_existing_datasets(self):
factories.Dataset(name='unrelated') factories.Dataset(name='unrelated')

View File

@ -116,3 +116,21 @@ class TestCkanHarvester(object):
assert_equal(result['state'], 'COMPLETE') assert_equal(result['state'], 'COMPLETE')
assert_equal(result['report_status'], 'added') assert_equal(result['report_status'], 'added')
assert_equal(result['dataset']['name'], mock_ckan.DATASETS[0]['name']) assert_equal(result['dataset']['name'], mock_ckan.DATASETS[0]['name'])
def test_exclude_organizations(self):
config = {'organizations_filter_exclude': ['org1-id']}
results_by_guid = run_harvest(
url='http://localhost:%s' % mock_ckan.PORT,
harvester=CKANHarvester(),
config=json.dumps(config))
assert 'dataset1-id' not in results_by_guid
assert mock_ckan.DATASETS[1]['id'] in results_by_guid
def test_include_organizations(self):
config = {'organizations_filter_include': ['org1-id']}
results_by_guid = run_harvest(
url='http://localhost:%s' % mock_ckan.PORT,
harvester=CKANHarvester(),
config=json.dumps(config))
assert 'dataset1-id' in results_by_guid
assert mock_ckan.DATASETS[1]['id'] not in results_by_guid