diff --git a/README.rst b/README.rst index 1d21059..c3337c6 100644 --- a/README.rst +++ b/README.rst @@ -31,24 +31,27 @@ Installation ckan.harvest.mq.type = amqp +2. Activate your CKAN virtual environment, for example:: -2. Install the extension into your python environment:: + $ . /usr/lib/ckan/default/bin/activate + +3. Install the ckanext-harvest Python package into your virtual environment:: (pyenv) $ pip install -e git+https://github.com/ckan/ckanext-harvest.git#egg=ckanext-harvest -3. Install the rest of python modules required by the extension:: +4. Install the python modules required by the extension:: (pyenv) $ pip install -r pip-requirements.txt -4. Make sure the CKAN configuration ini file contains the harvest main plugin, as +5. Make sure the CKAN configuration ini file contains the harvest main plugin, as well as the harvester for CKAN instances if you need it (included with the extension):: - ckan.plugins = harvest ckan_harvester + ckan.plugins = harvest ckan_harvester -5. If you haven't done it yet on the previous step, define the backend that you are using with the ``ckan.harvest.mq.type`` - option (it defaults to ``rabbitmq``):: +6. If you haven't done it yet on the previous step, define the backend that you + are using with the ``ckan.harvest.mq.type`` option (it defaults to ``amqp``):: - ckan.harvest.mq.type = redis + ckan.harvest.mq.type = redis There are a number of configuration options available for the backends. These don't need to be modified at all if you are using the default Redis or RabbitMQ install (step 1). The list @@ -96,24 +99,44 @@ The following operations can be run from the command line using the harvester source {name} {url} {type} [{title}] [{active}] [{owner_org}] [{frequency}] [{config}] - create new harvest source - harvester rmsource {id} - - remove (deactivate) a harvester source, whilst leaving any related datasets, jobs and objects + harvester source {source-id/name} + - shows a harvest source - harvester clearsource {id} - - clears all datasets, jobs and objects related to a harvest source, but keeps the source itself + harvester rmsource {source-id/name} + - remove (deactivate) a harvester source, whilst leaving any related + datasets, jobs and objects + + harvester clearsource {source-id/name} + - clears all datasets, jobs and objects related to a harvest source, + but keeps the source itself harvester sources [all] - lists harvest sources If 'all' is defined, it also shows the Inactive sources - harvester job {source-id} + harvester job {source-id/name} - create new harvest job harvester jobs - lists harvest jobs + harvester job_abort {source-id/name} + - marks a job as "Aborted" so that the source can be restarted afresh. + It ensures that the job's harvest objects status are also marked + finished. You should ensure that neither the job nor its objects are + currently in the gather/fetch queues. + harvester run - - runs harvest jobs + - starts any harvest jobs that have been created by putting them onto + the gather queue. Also checks running jobs and if finished, it + changes their status to Finished. + + harvester run_test {source-id/name} + - runs a harvest - for testing only. + This does all the stages of the harvest (creates job, gather, fetch, + import) without involving the web UI or the queue backends. This is + useful for testing a harvester without having to fire up + gather/fetch_consumer processes, as is done in production. harvester gather_consumer - starts the consumer for the gathering queue @@ -123,14 +146,22 @@ The following operations can be run from the command line using the harvester purge_queues - removes all jobs from fetch and gather queue + WARNING: if using Redis, this command purges all data in the current + Redis database - harvester [-j] [--segments={segments}] import [{source-id}] - - perform the import stage with the last fetched objects, optionally belonging to a certain source. - Please note that no objects will be fetched from the remote server. It will only affect - the last fetched objects already present in the database. + harvester [-j] [-o] [--segments={segments}] import [{source-id}] + - perform the import stage with the last fetched objects, for a certain + source or a single harvest object. Please note that no objects will + be fetched from the remote server. It will only affect the objects + already present in the database. - If the -j flag is provided, the objects are not joined to existing datasets. This may be useful - when importing objects for the first time. + To import a particular harvest source, specify its id as an argument. + To import a particular harvest object use the -o option. + To import a particular package use the -p option. + + You will need to specify the -j flag in cases where the datasets are + not yet created (e.g. first harvest, or all previous harvests have + failed) The --segments flag allows to define a string containing hex digits that represent which of the 16 harvest object segments to import. e.g. 15af will run segments 1,5,a,f @@ -233,6 +264,22 @@ field. The currently supported configuration options are: lower-case ones, and spaces replaced with dashes. Setting this option to False gives the same effect as leaving it unset. +* organizations_filter_include: This configuration option allows you to specify + a list of remote organization names (e.g. "arkansas-gov" is the name for + organization http://catalog.data.gov/organization/arkansas-gov ). If this + property has a value then only datasets that are in one of these organizations + will be harvested. All other datasets will be skipped. Only one of + organizations_filter_include or organizations_filter_exclude should be + configured. + +* organizations_filter_exclude: This configuration option allows you to specify + a list of remote organization names (e.g. "arkansas-gov" is the name for + organization http://catalog.data.gov/organization/arkansas-gov ). If this + property is set then all datasets from the remote source will be harvested + unless it belongs to one of the organizations in this option. Only one of + organizations_filter_exclude or organizations_filter_include should be + configured. + Here is an example of a configuration object (the one that must be entered in the configuration field):: @@ -242,6 +289,8 @@ the configuration field):: "default_groups":["my-own-group"], "default_extras":{"new_extra":"Test","harvest_url":"{harvest_source_url}/dataset/{dataset_id}"}, "override_extras": true, + "organizations_filter_include": [], + "organizations_filter_exclude": ["remote-organization"], "user":"harverster-user", "api_key":"", "read_only": true, @@ -405,14 +454,43 @@ Here you can also find other examples of custom harvesters: * https://github.com/ckan/ckanext-dcat/tree/master/ckanext/dcat/harvesters * https://github.com/ckan/ckanext-spatial/tree/master/ckanext/spatial/harvesters - Running the harvest jobs ======================== -The harvesting extension uses two different queues, one that handles the -gathering and another one that handles the fetching and importing. To start -the consumers run the following command -(make sure you have your python environment activated):: +There are two ways to run a harvest:: + + 1. ``harvester run_test`` for the command-line, suitable for testing + 2. ``harvester run`` used by the Web UI and scheduled runs + +harvester run_test +------------------ + +You can run a harvester simply using the ``run_test`` command. This is handy +for running a harvest with one command in the console and see all the output +in-line. It runs the gather, fetch and import stages all in the same process. + +This is useful for developing a harvester because you can insert break-points +in your harvester, and rerun a harvest without having to restart the +gather_consumer and fetch_consumer processes each time. In addition, because it +doesn't use the queue backends it doesn't interfere with harvests of other +sources that may be going on in the background. + +However running this way, if gather_stage, fetch_stage or import_stage raise an +exception, they are not caught, whereas with ``harvester run`` they are handled +slightly differently as they are called by queue.py. So when testing this +aspect its best to use ``harvester run``. + +harvester run +------------- + +When a harvest job is started by a user in the Web UI, or by a scheduled +harvest, the harvest is started by the ``harvester run`` command. This is the +normal method in production systems and scales well. + +In this case, the harvesting extension uses two different queues: one that +handles the gathering and another one that handles the fetching and importing. +To start the consumers run the following command (make sure you have your +python environment activated):: paster --plugin=ckanext-harvest harvester gather_consumer --config=mysite.ini @@ -429,9 +507,19 @@ The ``run`` command not only starts any pending harvesting jobs, but also flags those that are finished, allowing new jobs to be created on that particular source and refreshing the source statistics. That means that you will need to run this command before being able to create a new job on a source that was being -harvested (On a production site you will typically have a cron job that runs the +harvested. (On a production site you will typically have a cron job that runs the command regularly, see next section). +Occasionally you can find a harvesting job is in a "limbo state" where the job +has run with errors but the ``harvester run`` command will not mark it as +finished, and therefore you cannot run another job. This is due to particular +harvester not handling errors correctly e.g. during development. In this +circumstance, ensure that the gather & fetch consumers are running and have +nothing more to consume, and then run this abort command with the name or id of +the harvest source:: + + paster --plugin=ckanext-harvest harvester job_abort {source-id/name} --config=mysite.ini + Setting up the harvesters on a production server ================================================ diff --git a/ckanext/harvest/commands/harvester.py b/ckanext/harvest/commands/harvester.py index c0592e4..9f8db0d 100644 --- a/ckanext/harvest/commands/harvester.py +++ b/ckanext/harvest/commands/harvester.py @@ -18,24 +18,44 @@ class Harvester(CkanCommand): harvester source {name} {url} {type} [{title}] [{active}] [{owner_org}] [{frequency}] [{config}] - create new harvest source - harvester rmsource {id} - - remove (deactivate) a harvester source, whilst leaving any related datasets, jobs and objects + harvester source {source-id/name} + - shows a harvest source - harvester clearsource {id} - - clears all datasets, jobs and objects related to a harvest source, but keeps the source itself + harvester rmsource {source-id/name} + - remove (deactivate) a harvester source, whilst leaving any related + datasets, jobs and objects + + harvester clearsource {source-id/name} + - clears all datasets, jobs and objects related to a harvest source, + but keeps the source itself harvester sources [all] - lists harvest sources If 'all' is defined, it also shows the Inactive sources - harvester job {source-id} + harvester job {source-id/name} - create new harvest job harvester jobs - lists harvest jobs + harvester job_abort {source-id/name} + - marks a job as "Aborted" so that the source can be restarted afresh. + It ensures that the job's harvest objects status are also marked + finished. You should ensure that neither the job nor its objects are + currently in the gather/fetch queues. + harvester run - - runs harvest jobs + - starts any harvest jobs that have been created by putting them onto + the gather queue. Also checks running jobs and if finished, it + changes their status to Finished. + + harvester run_test {source-id/name} + - runs a harvest - for testing only. + This does all the stages of the harvest (creates job, gather, fetch, + import) without involving the web UI or the queue backends. This is + useful for testing a harvester without having to fire up + gather/fetch_consumer processes, as is done in production. harvester gather_consumer - starts the consumer for the gathering queue @@ -54,10 +74,13 @@ class Harvester(CkanCommand): be fetched from the remote server. It will only affect the objects already present in the database. - To perform it on a particular object use the -o flag. + To import a particular harvest source, specify its id as an argument. + To import a particular harvest object use the -o option. + To import a particular package use the -p option. - If the -j flag is provided, the objects are not joined to existing datasets. This may be useful - when importing objects for the first time. + You will need to specify the -j flag in cases where the datasets are + not yet created (e.g. first harvest, or all previous harvests have + failed) The --segments flag allows to define a string containing hex digits that represent which of the 16 harvest object segments to import. e.g. 15af will run segments 1,5,a,f @@ -115,7 +138,10 @@ class Harvester(CkanCommand): sys.exit(1) cmd = self.args[0] if cmd == 'source': - self.create_harvest_source() + if len(self.args) > 2: + self.create_harvest_source() + else: + self.show_harvest_source() elif cmd == 'rmsource': self.remove_harvest_source() elif cmd == 'clearsource': @@ -126,8 +152,12 @@ class Harvester(CkanCommand): self.create_harvest_job() elif cmd == 'jobs': self.list_harvest_jobs() + elif cmd == 'job_abort': + self.job_abort() elif cmd == 'run': self.run_harvester() + elif cmd == 'run_test': + self.run_test_harvest() elif cmd == 'gather_consumer': import logging from ckanext.harvest.queue import (get_gather_consumer, @@ -240,7 +270,8 @@ class Harvester(CkanCommand): # Create a harvest job for the new source if not regular job. if not data_dict['frequency']: - get_action('harvest_job_create')(context,{'source_id':source['id']}) + get_action('harvest_job_create')( + context, {'source_id': source['id']}) print 'A new Harvest Job for this source has also been created' except ValidationError,e: @@ -248,25 +279,44 @@ class Harvester(CkanCommand): print str(e.error_dict) raise e + def show_harvest_source(self): + + if len(self.args) >= 2: + source_id_or_name = unicode(self.args[1]) + else: + print 'Please provide a source name' + sys.exit(1) + context = {'model': model, 'session': model.Session, + 'user': self.admin_user['name']} + source = get_action('harvest_source_show')( + context, {'id': source_id_or_name}) + self.print_harvest_source(source) + def remove_harvest_source(self): if len(self.args) >= 2: - source_id = unicode(self.args[1]) + source_id_or_name = unicode(self.args[1]) else: print 'Please provide a source id' sys.exit(1) - context = {'model': model, 'user': self.admin_user['name'], 'session':model.Session} - get_action('harvest_source_delete')(context,{'id':source_id}) - print 'Removed harvest source: %s' % source_id + context = {'model': model, 'session': model.Session, + 'user': self.admin_user['name']} + source = get_action('harvest_source_show')( + context, {'id': source_id_or_name}) + get_action('harvest_source_delete')(context, {'id': source['id']}) + print 'Removed harvest source: %s' % source_id_or_name def clear_harvest_source(self): if len(self.args) >= 2: - source_id = unicode(self.args[1]) + source_id_or_name = unicode(self.args[1]) else: print 'Please provide a source id' sys.exit(1) - context = {'model': model, 'user': self.admin_user['name'], 'session':model.Session} - get_action('harvest_source_clear')(context,{'id':source_id}) - print 'Cleared harvest source: %s' % source_id + context = {'model': model, 'session': model.Session, + 'user': self.admin_user['name']} + source = get_action('harvest_source_show')( + context, {'id': source_id_or_name}) + get_action('harvest_source_clear')(context, {'id': source['id']}) + print 'Cleared harvest source: %s' % source_id_or_name def list_harvest_sources(self): if len(self.args) >= 2 and self.args[1] == 'all': @@ -283,13 +333,17 @@ class Harvester(CkanCommand): def create_harvest_job(self): if len(self.args) >= 2: - source_id = unicode(self.args[1]) + source_id_or_name = unicode(self.args[1]) else: print 'Please provide a source id' sys.exit(1) + context = {'model': model, 'session': model.Session, + 'user': self.admin_user['name']} + source = get_action('harvest_source_show')( + context, {'id': source_id_or_name}) - context = {'model': model,'session':model.Session, 'user': self.admin_user['name']} - job = get_action('harvest_job_create')(context,{'source_id':source_id}) + job = get_action('harvest_job_create')( + context, {'source_id': source['id']}) self.print_harvest_job(job) jobs = get_action('harvest_job_list')(context,{'status':u'New'}) @@ -302,6 +356,23 @@ class Harvester(CkanCommand): self.print_harvest_jobs(jobs) self.print_there_are(what='harvest job', sequence=jobs) + def job_abort(self): + if len(self.args) >= 2: + source_id_or_name = unicode(self.args[1]) + else: + print 'Please provide a source id' + sys.exit(1) + context = {'model': model, 'session': model.Session, + 'user': self.admin_user['name']} + source = get_action('harvest_source_show')( + context, {'id': source_id_or_name}) + + context = {'model': model, 'user': self.admin_user['name'], + 'session': model.Session} + job = get_action('harvest_job_abort')(context, + {'source_id': source['id']}) + print 'Job status: {0}'.format(job['status']) + def run_harvester(self): context = {'model': model, 'user': self.admin_user['name'], 'session':model.Session} try: @@ -309,18 +380,69 @@ class Harvester(CkanCommand): except NoNewHarvestJobError: print 'There are no new harvest jobs to run.' + def run_test_harvest(self): + from ckanext.harvest import queue + from ckanext.harvest.tests import lib + from ckanext.harvest.logic import HarvestJobExists + from ckanext.harvest.model import HarvestJob + + # Determine the source + if len(self.args) >= 2: + source_id_or_name = unicode(self.args[1]) + else: + print 'Please provide a source id' + sys.exit(1) + context = {'model': model, 'session': model.Session, + 'user': self.admin_user['name']} + source = get_action('harvest_source_show')( + context, {'id': source_id_or_name}) + + # Determine the job + try: + job_dict = get_action('harvest_job_create')( + context, {'source_id': source['id']}) + except HarvestJobExists: + running_jobs = get_action('harvest_job_list')( + context, {'source_id': source['id'], 'status': 'Running'}) + if running_jobs: + print '\nSource "%s" apparently has a "Running" job:\n%r' \ + % (source.get('name') or source['id'], running_jobs) + resp = raw_input('Abort it? (y/n)') + if not resp.lower().startswith('y'): + sys.exit(1) + job_dict = get_action('harvest_job_abort')( + context, {'source_id': source['id']}) + else: + print 'Reusing existing harvest job' + jobs = get_action('harvest_job_list')( + context, {'source_id': source['id'], 'status': 'New'}) + assert len(jobs) == 1, \ + 'Multiple "New" jobs for this source! %r' % jobs + job_dict = jobs[0] + job_obj = HarvestJob.get(job_dict['id']) + + harvester = queue.get_harvester(source['source_type']) + assert harvester, \ + 'No harvester found for type: %s' % source['source_type'] + lib.run_harvest_job(job_obj, harvester) + def import_stage(self): if len(self.args) >= 2: - source_id = unicode(self.args[1]) + source_id_or_name = unicode(self.args[1]) + context = {'model': model, 'session': model.Session, + 'user': self.admin_user['name']} + source = get_action('harvest_source_show')( + context, {'id': source_id_or_name}) + source_id = source['id'] else: source_id = None - context = {'model': model, 'session':model.Session, 'user': self.admin_user['name'], + context = {'model': model, 'session': model.Session, + 'user': self.admin_user['name'], 'join_datasets': not self.options.no_join_datasets, 'segments': self.options.segments} - objs_count = get_action('harvest_objects_import')(context,{ 'source_id': source_id, 'harvest_object_id': self.options.harvest_object_id, @@ -347,9 +469,16 @@ class Harvester(CkanCommand): def print_harvest_source(self, source): print 'Source id: %s' % source.get('id') + if 'name' in source: + # 'name' is only there if the source comes from the Package + print ' name: %s' % source.get('name') print ' url: %s' % source.get('url') - print ' type: %s' % source.get('type') - print ' active: %s' % (source.get('active', source.get('state') == 'active')) + # 'type' if source comes from HarvestSource, 'source_type' if it comes + # from the Package + print ' type: %s' % (source.get('source_type') or + source.get('type')) + print ' active: %s' % (source.get('active', + source.get('state') == 'active')) print 'frequency: %s' % source.get('frequency') print ' jobs: %s' % source.get('status').get('job_count') print '' diff --git a/ckanext/harvest/harvesters/ckanharvester.py b/ckanext/harvest/harvesters/ckanharvester.py index 6339969..f041216 100644 --- a/ckanext/harvest/harvesters/ckanharvester.py +++ b/ckanext/harvest/harvesters/ckanharvester.py @@ -46,7 +46,7 @@ class CKANHarvester(HarvesterBase): http_response = urllib2.urlopen(http_request) except urllib2.URLError, e: raise ContentFetchError( - 'Could not fetch url: %s, error: %s' % + 'Could not fetch url: %s, error: %s' % (url, str(e)) ) return http_response.read() @@ -160,6 +160,26 @@ class CKANHarvester(HarvesterBase): base_rest_url = base_url + self._get_rest_api_offset() base_search_url = base_url + self._get_search_api_offset() + # Filter in/out datasets from particular organizations + org_filter_include = self.config.get('organizations_filter_include', []) + org_filter_exclude = self.config.get('organizations_filter_exclude', []) + def get_pkg_ids_for_organizations(orgs): + pkg_ids = set() + for organization in orgs: + url = base_search_url + '/dataset?organization=%s' % organization + content = self._get_content(url) + content_json = json.loads(content) + result_count = int(content_json['count']) + pkg_ids |= set(content_json['results']) + while len(pkg_ids) < result_count or not content_json['results']: + url = base_search_url + '/dataset?organization=%s&offset=%s' % (organization, len(pkg_ids)) + content = self._get_content(url) + content_json = json.loads(content) + pkg_ids |= set(content_json['results']) + return pkg_ids + include_pkg_ids = get_pkg_ids_for_organizations(org_filter_include) + exclude_pkg_ids = get_pkg_ids_for_organizations(org_filter_exclude) + if (previous_job and not previous_job.gather_errors and not len(previous_job.objects) == 0): if not self.config.get('force_all',False): get_all_packages = False @@ -182,9 +202,7 @@ class CKANHarvester(HarvesterBase): continue revision = json.loads(content) - for package_id in revision['packages']: - if not package_id in package_ids: - package_ids.append(package_id) + package_ids = revision['packages'] else: log.info('No packages have been updated on the remote CKAN instance since the last harvest job') return None @@ -197,19 +215,22 @@ class CKANHarvester(HarvesterBase): self._save_gather_error('Unable to get content for URL: %s: %s' % (url, str(e)),harvest_job) return None - - if get_all_packages: # Request all remote packages url = base_rest_url + '/package' + try: content = self._get_content(url) except ContentFetchError,e: self._save_gather_error('Unable to get content for URL: %s: %s' % (url, str(e)),harvest_job) return None - package_ids = json.loads(content) + if org_filter_include: + package_ids = set(package_ids) & include_pkg_ids + elif org_filter_exclude: + package_ids = set(package_ids) - exclude_pkg_ids + try: object_ids = [] if len(package_ids): diff --git a/ckanext/harvest/logic/action/create.py b/ckanext/harvest/logic/action/create.py index 8c23ded..203b9eb 100644 --- a/ckanext/harvest/logic/action/create.py +++ b/ckanext/harvest/logic/action/create.py @@ -70,9 +70,9 @@ def harvest_source_create(context,data_dict): return source -def harvest_job_create(context,data_dict): +def harvest_job_create(context, data_dict): log.info('Harvest job create: %r', data_dict) - check_access('harvest_job_create',context,data_dict) + check_access('harvest_job_create', context, data_dict) source_id = data_dict['source_id'] @@ -84,13 +84,16 @@ def harvest_job_create(context,data_dict): # Check if the source is active if not source.active: - log.warn('Harvest job cannot be created for inactive source %s', source_id) + log.warn('Harvest job cannot be created for inactive source %s', + source_id) raise Exception('Can not create jobs on inactive sources') - # Check if there already is an unrun or currently running job for this source + # Check if there already is an unrun or currently running job for this + # source exists = _check_for_existing_jobs(context, source_id) if exists: - log.warn('There is already an unrun job %r for this source %s', exists, source_id) + log.warn('There is already an unrun job %r for this source %s', + exists, source_id) raise HarvestJobExists('There already is an unrun job for this source') job = HarvestJob() @@ -98,7 +101,8 @@ def harvest_job_create(context,data_dict): job.save() log.info('Harvest job saved %s', job.id) - return harvest_job_dictize(job,context) + return harvest_job_dictize(job, context) + def harvest_job_create_all(context,data_dict): log.info('Harvest job create all: %r', data_dict) diff --git a/ckanext/harvest/logic/action/get.py b/ckanext/harvest/logic/action/get.py index 2ddcdd9..4fbb5ac 100644 --- a/ckanext/harvest/logic/action/get.py +++ b/ckanext/harvest/logic/action/get.py @@ -20,7 +20,7 @@ from ckanext.harvest.logic.dictization import (harvest_source_dictize, log = logging.getLogger(__name__) @side_effect_free -def harvest_source_show(context,data_dict): +def harvest_source_show(context, data_dict): ''' Returns the metadata of a harvest source @@ -234,6 +234,11 @@ def harvest_job_report(context, data_dict): @side_effect_free def harvest_job_list(context,data_dict): + '''Returns a list of jobs and details of objects and errors. + + :param status: filter by e.g. "New" or "Finished" jobs + :param source_id: filter by a harvest source + ''' check_access('harvest_job_list',context,data_dict) @@ -315,6 +320,7 @@ def harvest_object_list(context,data_dict): @side_effect_free def harvesters_info_show(context,data_dict): + '''Returns details of the installed harvesters.''' check_access('harvesters_info_show',context,data_dict) diff --git a/ckanext/harvest/logic/action/update.py b/ckanext/harvest/logic/action/update.py index 9e871d9..bca776c 100644 --- a/ckanext/harvest/logic/action/update.py +++ b/ckanext/harvest/logic/action/update.py @@ -27,7 +27,7 @@ from ckanext.harvest.queue import get_gather_publisher, resubmit_jobs from ckanext.harvest.model import HarvestSource, HarvestJob, HarvestObject from ckanext.harvest.logic import HarvestJobExists, NoNewHarvestJobError -from ckanext.harvest.logic.schema import harvest_source_show_package_schema +from ckanext.harvest.logic.dictization import harvest_job_dictize from ckanext.harvest.logic.action.get import harvest_source_show, harvest_job_list, _get_sources_for_user @@ -83,10 +83,12 @@ def harvest_source_update(context,data_dict): return source -def harvest_source_clear(context,data_dict): + +def harvest_source_clear(context, data_dict): ''' - Clears all datasets, jobs and objects related to a harvest source, but keeps the source itself. - This is useful to clean history of long running harvest sources to start again fresh. + Clears all datasets, jobs and objects related to a harvest source, but + keeps the source itself. This is useful to clean history of long running + harvest sources to start again fresh. :param id: the id of the harvest source to clear :type id: string @@ -94,7 +96,7 @@ def harvest_source_clear(context,data_dict): ''' check_access('harvest_source_clear',context,data_dict) - harvest_source_id = data_dict.get('id',None) + harvest_source_id = data_dict.get('id', None) source = HarvestSource.get(harvest_source_id) if not source: @@ -182,6 +184,14 @@ def harvest_source_clear(context,data_dict): return {'id': harvest_source_id} def harvest_source_index_clear(context,data_dict): + ''' + Clears all datasets, jobs and objects related to a harvest source, but + keeps the source itself. This is useful to clean history of long running + harvest sources to start again fresh. + + :param id: the id of the harvest source to clear + :type id: string + ''' check_access('harvest_source_clear',context,data_dict) harvest_source_id = data_dict.get('id',None) @@ -208,14 +218,26 @@ def harvest_source_index_clear(context,data_dict): return {'id': harvest_source_id} -def harvest_objects_import(context,data_dict): + +def harvest_objects_import(context, data_dict): ''' - Reimports the current harvest objects - It performs the import stage with the last fetched objects, optionally - belonging to a certain source. - Please note that no objects will be fetched from the remote server. - It will only affect the last fetched objects already present in the - database. + Reimports the existing harvest objects, specified by either source_id, + harvest_object_id or package_id. + + It performs the import stage with the last fetched objects, optionally + belonging to a certain source. + + Please note that no objects will be fetched from the remote server. + + It will only affect the last fetched objects already present in the + database. + + :param source_id: the id of the harvest source to import + :type source_id: string + :param harvest_object_id: the id of the harvest object to import + :type harvest_object_id: string + :param package_id: the id or name of the package to import + :type package_id: string ''' log.info('Harvest objects import: %r', data_dict) check_access('harvest_objects_import',context,data_dict) @@ -393,6 +415,64 @@ def harvest_jobs_run(context,data_dict): return sent_jobs +def harvest_job_abort(context, data_dict): + ''' + Aborts a harvest job. Given a harvest source_id, it looks for the latest + one and (assuming it not already Finished) marks it as Finished. It also + marks any of that source's harvest objects and (if not complete or error) + marks them "ERROR", so any left in limbo are cleaned up. Does not actually + stop running any queued harvest fetchs/objects. + + :param source_id: the name or id of the harvest source with a job to abort + :type source_id: string + ''' + + check_access('harvest_job_abort', context, data_dict) + + model = context['model'] + + source_id = data_dict.get('source_id', None) + source = harvest_source_show(context, {'id': source_id}) + + # HarvestJob set status to 'Finished' + # Don not use harvest_job_list since it can use a lot of memory + last_job = model.Session.query(HarvestJob) \ + .filter_by(source_id=source['id']) \ + .order_by(HarvestJob.created.desc()).first() + if not last_job: + raise NotFound('Error: source has no jobs') + job = get_action('harvest_job_show')(context, + {'id': last_job.id}) + + if job['status'] != 'Finished': + # i.e. New or Running + job_obj = HarvestJob.get(job['id']) + job_obj.status = new_status = 'Finished' + model.repo.commit_and_remove() + log.info('Harvest job changed status from "%s" to "%s"', + job['status'], new_status) + else: + log.info('Harvest job unchanged. Source %s status is: "%s"', + job['id'], job['status']) + + # HarvestObjects set to ERROR + job_obj = HarvestJob.get(job['id']) + objs = job_obj.objects + for obj in objs: + if obj.state not in ('COMPLETE', 'ERROR'): + old_state = obj.state + obj.state = 'ERROR' + log.info('Harvest object changed state from "%s" to "%s": %s', + old_state, obj.state, obj.id) + else: + log.info('Harvest object not changed from "%s": %s', + obj.state, obj.id) + model.repo.commit_and_remove() + + job_obj = HarvestJob.get(job['id']) + return harvest_job_dictize(job_obj, context) + + @logic.side_effect_free def harvest_sources_reindex(context, data_dict): ''' @@ -430,7 +510,8 @@ def harvest_source_reindex(context, data_dict): context.update({'ignore_auth': True}) package_dict = logic.get_action('harvest_source_show')(context, {'id': harvest_source_id}) - log.debug('Updating search index for harvest source {0}'.format(harvest_source_id)) + log.debug('Updating search index for harvest source: {0}'.format( + package_dict.get('name') or harvest_source_id)) # Remove configuration values new_dict = {} diff --git a/ckanext/harvest/logic/auth/update.py b/ckanext/harvest/logic/auth/update.py index f394ba6..1bc208a 100644 --- a/ckanext/harvest/logic/auth/update.py +++ b/ckanext/harvest/logic/auth/update.py @@ -58,6 +58,16 @@ def harvest_jobs_run(context, data_dict): else: return {'success': True} + +def harvest_job_abort(context, data_dict): + ''' + Authorization check for aborting a running harvest job + + Same permissions as running one + ''' + return harvest_jobs_run(context, data_dict) + + def harvest_sources_reindex(context, data_dict): ''' Authorization check for reindexing all harvest sources diff --git a/ckanext/harvest/model/__init__.py b/ckanext/harvest/model/__init__.py index 077e892..4a30a54 100644 --- a/ckanext/harvest/model/__init__.py +++ b/ckanext/harvest/model/__init__.py @@ -118,7 +118,13 @@ class HarvestSource(HarvestDomainObject): or "inactive". The harvesting processes are not fired on inactive sources. ''' - pass + def __repr__(self): + return '' % \ + (self.id, self.title, self.url, self.active) + + def __str__(self): + return self.__repr__().encode('ascii', 'ignore') + class HarvestJob(HarvestDomainObject): '''A Harvesting Job is performed in two phases. In first place, the diff --git a/ckanext/harvest/queue.py b/ckanext/harvest/queue.py index aa0170e..d6a9f9a 100644 --- a/ckanext/harvest/queue.py +++ b/ckanext/harvest/queue.py @@ -16,7 +16,8 @@ log = logging.getLogger(__name__) assert not log.disabled __all__ = ['get_gather_publisher', 'get_gather_consumer', - 'get_fetch_publisher', 'get_fetch_consumer'] + 'get_fetch_publisher', 'get_fetch_consumer', + 'get_harvester'] PORT = 5672 USERID = 'guest' @@ -233,42 +234,38 @@ def gather_callback(channel, method, header, body): # Send the harvest job to the plugins that implement # the Harvester interface, only if the source type # matches - harvester_found = False - for harvester in PluginImplementations(IHarvester): - if harvester.info()['name'] == job.source.type: - harvester_found = True + harvester = get_harvester(job.source.type) - try: - harvest_object_ids = gather_stage(harvester, job) - except (Exception, KeyboardInterrupt): - channel.basic_ack(method.delivery_tag) - raise + if harvester: + try: + harvest_object_ids = gather_stage(harvester, job) + except (Exception, KeyboardInterrupt): + channel.basic_ack(method.delivery_tag) + raise - if not isinstance(harvest_object_ids, list): - log.error('Gather stage failed') - publisher.close() - channel.basic_ack(method.delivery_tag) - return False + if not isinstance(harvest_object_ids, list): + log.error('Gather stage failed') + publisher.close() + channel.basic_ack(method.delivery_tag) + return False - if len(harvest_object_ids) == 0: - log.info('No harvest objects to fetch') - publisher.close() - channel.basic_ack(method.delivery_tag) - return False + if len(harvest_object_ids) == 0: + log.info('No harvest objects to fetch') + publisher.close() + channel.basic_ack(method.delivery_tag) + return False - log.debug('Received from plugin gather_stage: {0} objects (first: {1} last: {2})'.format( - len(harvest_object_ids), harvest_object_ids[:1], harvest_object_ids[-1:])) - for id in harvest_object_ids: - # Send the id to the fetch queue - publisher.send({'harvest_object_id':id}) - log.debug('Sent {0} objects to the fetch queue'.format(len(harvest_object_ids))) + log.debug('Received from plugin gather_stage: {0} objects (first: {1} last: {2})'.format( + len(harvest_object_ids), harvest_object_ids[:1], harvest_object_ids[-1:])) + for id in harvest_object_ids: + # Send the id to the fetch queue + publisher.send({'harvest_object_id':id}) + log.debug('Sent {0} objects to the fetch queue'.format(len(harvest_object_ids))) - if not harvester_found: + else: # This can occur if you: - # * remove a harvester and it still has sources that are then - # refreshed - # * add a new harvester and restart CKAN but not the gather - # queue. + # * remove a harvester and it still has sources that are then refreshed + # * add a new harvester and restart CKAN but not the gather queue. msg = 'System error - No harvester could be found for source type %s' % job.source.type err = HarvestGatherError(message=msg,job=job) err.save() @@ -279,6 +276,12 @@ def gather_callback(channel, method, header, body): channel.basic_ack(method.delivery_tag) +def get_harvester(harvest_source_type): + for harvester in PluginImplementations(IHarvester): + if harvester.info()['name'] == harvest_source_type: + return harvester + + def gather_stage(harvester, job): '''Calls the harvester's gather_stage, returning harvest object ids, with some error handling. diff --git a/ckanext/harvest/tests/harvesters/test_base.py b/ckanext/harvest/tests/harvesters/test_base.py index 6e1da21..efb1a84 100644 --- a/ckanext/harvest/tests/harvesters/test_base.py +++ b/ckanext/harvest/tests/harvesters/test_base.py @@ -1,7 +1,7 @@ import re from nose.tools import assert_equal - +from ckanext.harvest import model as harvest_model from ckanext.harvest.harvesters.base import HarvesterBase try: from ckan.tests import helpers @@ -18,6 +18,7 @@ class TestGenNewName(object): @classmethod def setup_class(cls): helpers.reset_db() + harvest_model.setup() def test_basic(self): assert_equal(HarvesterBase._gen_new_name('Trees'), 'trees') @@ -31,6 +32,7 @@ class TestGenNewName(object): class TestEnsureNameIsUnique(object): def setup(self): helpers.reset_db() + harvest_model.setup() def test_no_existing_datasets(self): factories.Dataset(name='unrelated') diff --git a/ckanext/harvest/tests/harvesters/test_ckanharvester.py b/ckanext/harvest/tests/harvesters/test_ckanharvester.py index eb88505..89b3089 100644 --- a/ckanext/harvest/tests/harvesters/test_ckanharvester.py +++ b/ckanext/harvest/tests/harvesters/test_ckanharvester.py @@ -116,3 +116,21 @@ class TestCkanHarvester(object): assert_equal(result['state'], 'COMPLETE') assert_equal(result['report_status'], 'added') assert_equal(result['dataset']['name'], mock_ckan.DATASETS[0]['name']) + + def test_exclude_organizations(self): + config = {'organizations_filter_exclude': ['org1-id']} + results_by_guid = run_harvest( + url='http://localhost:%s' % mock_ckan.PORT, + harvester=CKANHarvester(), + config=json.dumps(config)) + assert 'dataset1-id' not in results_by_guid + assert mock_ckan.DATASETS[1]['id'] in results_by_guid + + def test_include_organizations(self): + config = {'organizations_filter_include': ['org1-id']} + results_by_guid = run_harvest( + url='http://localhost:%s' % mock_ckan.PORT, + harvester=CKANHarvester(), + config=json.dumps(config)) + assert 'dataset1-id' in results_by_guid + assert mock_ckan.DATASETS[1]['id'] not in results_by_guid