diff --git a/README.rst b/README.rst index 45ec53f..ae83525 100644 --- a/README.rst +++ b/README.rst @@ -31,24 +31,27 @@ Installation ckan.harvest.mq.type = amqp +2. Activate your CKAN virtual environment, for example:: -2. Install the extension into your python environment:: + $ . /usr/lib/ckan/default/bin/activate + +3. Install the ckanext-harvest Python package into your virtual environment:: (pyenv) $ pip install -e git+https://github.com/ckan/ckanext-harvest.git#egg=ckanext-harvest -3. Install the rest of python modules required by the extension:: +4. Install the python modules required by the extension:: (pyenv) $ pip install -r pip-requirements.txt -4. Make sure the CKAN configuration ini file contains the harvest main plugin, as +5. Make sure the CKAN configuration ini file contains the harvest main plugin, as well as the harvester for CKAN instances if you need it (included with the extension):: - ckan.plugins = harvest ckan_harvester + ckan.plugins = harvest ckan_harvester -5. If you haven't done it yet on the previous step, define the backend that you are using with the ``ckan.harvest.mq.type`` - option (it defaults to ``rabbitmq``):: +6. If you haven't done it yet on the previous step, define the backend that you + are using with the ``ckan.harvest.mq.type`` option (it defaults to ``amqp``):: - ckan.harvest.mq.type = redis + ckan.harvest.mq.type = redis There are a number of configuration options available for the backends. These don't need to be modified at all if you are using the default Redis or RabbitMQ install (step 1). The list @@ -96,25 +99,44 @@ The following operations can be run from the command line using the harvester source {name} {url} {type} [{title}] [{active}] [{owner_org}] [{frequency}] [{config}] - create new harvest source - harvester rmsource {id} - - remove (deactivate) a harvester source, whilst leaving any related datasets, jobs and objects + harvester source {source-id/name} + - shows a harvest source - harvester clearsource {id} - - clears all datasets, jobs and objects related to a harvest source, but keeps the source itself + harvester rmsource {source-id/name} + - remove (deactivate) a harvester source, whilst leaving any related + datasets, jobs and objects + + harvester clearsource {source-id/name} + - clears all datasets, jobs and objects related to a harvest source, + but keeps the source itself harvester sources [all] - lists harvest sources If 'all' is defined, it also shows the Inactive sources - harvester job {source-id} + harvester job {source-id/name} - create new harvest job harvester jobs - lists harvest jobs + harvester job_abort {source-id/name} + - marks a job as "Aborted" so that the source can be restarted afresh. + It ensures that the job's harvest objects status are also marked + finished. You should ensure that neither the job nor its objects are + currently in the gather/fetch queues. + harvester run - - runs any scheduled harvest jobs and looks for jobs that can be marked - as finished + - starts any harvest jobs that have been created by putting them onto + the gather queue. Also checks running jobs - if finished it + changes their status to Finished. + + harvester run_test {source-id/name} + - runs a harvest - for testing only. + This does all the stages of the harvest (creates job, gather, fetch, + import) without involving the web UI or the queue backends. This is + useful for testing a harvester without having to fire up + gather/fetch_consumer processes, as is done in production. harvester gather_consumer - starts the consumer for the gathering queue @@ -124,14 +146,22 @@ The following operations can be run from the command line using the harvester purge_queues - removes all jobs from fetch and gather queue + WARNING: if using Redis, this command purges all data in the current + Redis database - harvester [-j] [--segments={segments}] import [{source-id}] - - perform the import stage with the last fetched objects, optionally belonging to a certain source. - Please note that no objects will be fetched from the remote server. It will only affect - the last fetched objects already present in the database. + harvester [-j] [-o] [--segments={segments}] import [{source-id}] + - perform the import stage with the last fetched objects, for a certain + source or a single harvest object. Please note that no objects will + be fetched from the remote server. It will only affect the objects + already present in the database. - If the -j flag is provided, the objects are not joined to existing datasets. This may be useful - when importing objects for the first time. + To import a particular harvest source, specify its id as an argument. + To import a particular harvest object use the -o option. + To import a particular package use the -p option. + + You will need to specify the -j flag in cases where the datasets are + not yet created (e.g. first harvest, or all previous harvests have + failed) The --segments flag allows to define a string containing hex digits that represent which of the 16 harvest object segments to import. e.g. 15af will run segments 1,5,a,f @@ -424,14 +454,43 @@ Here you can also find other examples of custom harvesters: * https://github.com/ckan/ckanext-dcat/tree/master/ckanext/dcat/harvesters * https://github.com/ckan/ckanext-spatial/tree/master/ckanext/spatial/harvesters - Running the harvest jobs ======================== -The harvesting extension uses two different queues, one that handles the -gathering and another one that handles the fetching and importing. To start -the consumers run the following command -(make sure you have your python environment activated):: +There are two ways to run a harvest:: + + 1. ``harvester run_test`` for the command-line, suitable for testing + 2. ``harvester run`` used by the Web UI and scheduled runs + +harvester run_test +------------------ + +You can run a harvester simply using the ``run_test`` command. This is handy +for running a harvest with one command in the console and see all the output +in-line. It runs the gather, fetch and import stages all in the same process. + +This is useful for developing a harvester because you can insert break-points +in your harvester, and rerun a harvest without having to restart the +gather_consumer and fetch_consumer processes each time. In addition, because it +doesn't use the queue backends it doesn't interfere with harvests of other +sources that may be going on in the background. + +However running this way, if gather_stage, fetch_stage or import_stage raise an +exception, they are not caught, whereas with ``harvester run`` they are handled +slightly differently as they are called by queue.py. So when testing this +aspect its best to use ``harvester run``. + +harvester run +------------- + +When a harvest job is started by a user in the Web UI, or by a scheduled +harvest, the harvest is started by the ``harvester run`` command. This is the +normal method in production systems and scales well. + +In this case, the harvesting extension uses two different queues: one that +handles the gathering and another one that handles the fetching and importing. +To start the consumers run the following command (make sure you have your +python environment activated):: paster --plugin=ckanext-harvest harvester gather_consumer --config=mysite.ini @@ -448,9 +507,19 @@ The ``run`` command not only starts any pending harvesting jobs, but also flags those that are finished, allowing new jobs to be created on that particular source and refreshing the source statistics. That means that you will need to run this command before being able to create a new job on a source that was being -harvested (On a production site you will typically have a cron job that runs the +harvested. (On a production site you will typically have a cron job that runs the command regularly, see next section). +Occasionally you can find a harvesting job is in a "limbo state" where the job +has run with errors but the ``harvester run`` command will not mark it as +finished, and therefore you cannot run another job. This is due to particular +harvester not handling errors correctly e.g. during development. In this +circumstance, ensure that the gather & fetch consumers are running and have +nothing more to consume, and then run this abort command with the name or id of +the harvest source:: + + paster --plugin=ckanext-harvest harvester job_abort {source-id/name} --config=mysite.ini + Setting up the harvesters on a production server ================================================ diff --git a/ckanext/harvest/commands/harvester.py b/ckanext/harvest/commands/harvester.py index ceceec4..1466c66 100644 --- a/ckanext/harvest/commands/harvester.py +++ b/ckanext/harvest/commands/harvester.py @@ -17,25 +17,44 @@ class Harvester(CkanCommand): harvester source {name} {url} {type} [{title}] [{active}] [{owner_org}] [{frequency}] [{config}] - create new harvest source - harvester rmsource {id} - - remove (deactivate) a harvester source, whilst leaving any related datasets, jobs and objects + harvester source {source-id/name} + - shows a harvest source - harvester clearsource {id} - - clears all datasets, jobs and objects related to a harvest source, but keeps the source itself + harvester rmsource {source-id/name} + - remove (deactivate) a harvester source, whilst leaving any related + datasets, jobs and objects + + harvester clearsource {source-id/name} + - clears all datasets, jobs and objects related to a harvest source, + but keeps the source itself harvester sources [all] - lists harvest sources If 'all' is defined, it also shows the Inactive sources - harvester job {source-id} + harvester job {source-id/name} - create new harvest job and runs it (puts it on the gather queue) harvester jobs - lists harvest jobs + harvester job_abort {source-id/name} + - marks a job as "Aborted" so that the source can be restarted afresh. + It ensures that the job's harvest objects status are also marked + finished. You should ensure that neither the job nor its objects are + currently in the gather/fetch queues. + harvester run - - runs any scheduled harvest jobs and looks for jobs that can be marked - as finished + - starts any harvest jobs that have been created by putting them onto + the gather queue. Also checks running jobs - if finished it + changes their status to Finished. + + harvester run_test {source-id/name} + - runs a harvest - for testing only. + This does all the stages of the harvest (creates job, gather, fetch, + import) without involving the web UI or the queue backends. This is + useful for testing a harvester without having to fire up + gather/fetch_consumer processes, as is done in production. harvester gather_consumer - starts the consumer for the gathering queue @@ -54,10 +73,13 @@ class Harvester(CkanCommand): be fetched from the remote server. It will only affect the objects already present in the database. - To perform it on a particular object use the -o flag. + To import a particular harvest source, specify its id as an argument. + To import a particular harvest object use the -o option. + To import a particular package use the -p option. - If the -j flag is provided, the objects are not joined to existing datasets. This may be useful - when importing objects for the first time. + You will need to specify the -j flag in cases where the datasets are + not yet created (e.g. first harvest, or all previous harvests have + failed) The --segments flag allows to define a string containing hex digits that represent which of the 16 harvest object segments to import. e.g. 15af will run segments 1,5,a,f @@ -115,7 +137,10 @@ class Harvester(CkanCommand): sys.exit(1) cmd = self.args[0] if cmd == 'source': - self.create_harvest_source() + if len(self.args) > 2: + self.create_harvest_source() + else: + self.show_harvest_source() elif cmd == 'rmsource': self.remove_harvest_source() elif cmd == 'clearsource': @@ -126,8 +151,12 @@ class Harvester(CkanCommand): self.create_harvest_job() elif cmd == 'jobs': self.list_harvest_jobs() + elif cmd == 'job_abort': + self.job_abort() elif cmd == 'run': self.run_harvester() + elif cmd == 'run_test': + self.run_test_harvest() elif cmd == 'gather_consumer': import logging from ckanext.harvest.queue import (get_gather_consumer, @@ -249,25 +278,44 @@ class Harvester(CkanCommand): print str(e.error_dict) raise e + def show_harvest_source(self): + + if len(self.args) >= 2: + source_id_or_name = unicode(self.args[1]) + else: + print 'Please provide a source name' + sys.exit(1) + context = {'model': model, 'session': model.Session, + 'user': self.admin_user['name']} + source = get_action('harvest_source_show')( + context, {'id': source_id_or_name}) + self.print_harvest_source(source) + def remove_harvest_source(self): if len(self.args) >= 2: - source_id = unicode(self.args[1]) + source_id_or_name = unicode(self.args[1]) else: print 'Please provide a source id' sys.exit(1) - context = {'model': model, 'user': self.admin_user['name'], 'session':model.Session} - get_action('harvest_source_delete')(context,{'id':source_id}) - print 'Removed harvest source: %s' % source_id + context = {'model': model, 'session': model.Session, + 'user': self.admin_user['name']} + source = get_action('harvest_source_show')( + context, {'id': source_id_or_name}) + get_action('harvest_source_delete')(context, {'id': source['id']}) + print 'Removed harvest source: %s' % source_id_or_name def clear_harvest_source(self): if len(self.args) >= 2: - source_id = unicode(self.args[1]) + source_id_or_name = unicode(self.args[1]) else: print 'Please provide a source id' sys.exit(1) - context = {'model': model, 'user': self.admin_user['name'], 'session':model.Session} - get_action('harvest_source_clear')(context,{'id':source_id}) - print 'Cleared harvest source: %s' % source_id + context = {'model': model, 'session': model.Session, + 'user': self.admin_user['name']} + source = get_action('harvest_source_show')( + context, {'id': source_id_or_name}) + get_action('harvest_source_clear')(context, {'id': source['id']}) + print 'Cleared harvest source: %s' % source_id_or_name def list_harvest_sources(self): if len(self.args) >= 2 and self.args[1] == 'all': @@ -284,14 +332,18 @@ class Harvester(CkanCommand): def create_harvest_job(self): if len(self.args) >= 2: - source_id = unicode(self.args[1]) + source_id_or_name = unicode(self.args[1]) else: print 'Please provide a source id' sys.exit(1) + context = {'model': model, 'session': model.Session, + 'user': self.admin_user['name']} + source = get_action('harvest_source_show')( + context, {'id': source_id_or_name}) context = {'model': model,'session':model.Session, 'user': self.admin_user['name']} job = get_action('harvest_job_create')( - context, {'source_id': source_id, 'run': True}) + context, {'source_id': source['id'], 'run': True}) self.print_harvest_job(job) jobs = get_action('harvest_job_list')(context,{'status':u'New'}) @@ -304,23 +356,91 @@ class Harvester(CkanCommand): self.print_harvest_jobs(jobs) self.print_there_are(what='harvest job', sequence=jobs) + def job_abort(self): + if len(self.args) >= 2: + source_id_or_name = unicode(self.args[1]) + else: + print 'Please provide a source id' + sys.exit(1) + context = {'model': model, 'session': model.Session, + 'user': self.admin_user['name']} + source = get_action('harvest_source_show')( + context, {'id': source_id_or_name}) + + context = {'model': model, 'user': self.admin_user['name'], + 'session': model.Session} + job = get_action('harvest_job_abort')(context, + {'source_id': source['id']}) + print 'Job status: {0}'.format(job['status']) + def run_harvester(self): context = {'model': model, 'user': self.admin_user['name'], 'session': model.Session} get_action('harvest_jobs_run')(context, {}) + def run_test_harvest(self): + from ckanext.harvest import queue + from ckanext.harvest.tests import lib + from ckanext.harvest.logic import HarvestJobExists + from ckanext.harvest.model import HarvestJob + + # Determine the source + if len(self.args) >= 2: + source_id_or_name = unicode(self.args[1]) + else: + print 'Please provide a source id' + sys.exit(1) + context = {'model': model, 'session': model.Session, + 'user': self.admin_user['name']} + source = get_action('harvest_source_show')( + context, {'id': source_id_or_name}) + + # Determine the job + try: + job_dict = get_action('harvest_job_create')( + context, {'source_id': source['id']}) + except HarvestJobExists: + running_jobs = get_action('harvest_job_list')( + context, {'source_id': source['id'], 'status': 'Running'}) + if running_jobs: + print '\nSource "%s" apparently has a "Running" job:\n%r' \ + % (source.get('name') or source['id'], running_jobs) + resp = raw_input('Abort it? (y/n)') + if not resp.lower().startswith('y'): + sys.exit(1) + job_dict = get_action('harvest_job_abort')( + context, {'source_id': source['id']}) + else: + print 'Reusing existing harvest job' + jobs = get_action('harvest_job_list')( + context, {'source_id': source['id'], 'status': 'New'}) + assert len(jobs) == 1, \ + 'Multiple "New" jobs for this source! %r' % jobs + job_dict = jobs[0] + job_obj = HarvestJob.get(job_dict['id']) + + harvester = queue.get_harvester(source['source_type']) + assert harvester, \ + 'No harvester found for type: %s' % source['source_type'] + lib.run_harvest_job(job_obj, harvester) + def import_stage(self): if len(self.args) >= 2: - source_id = unicode(self.args[1]) + source_id_or_name = unicode(self.args[1]) + context = {'model': model, 'session': model.Session, + 'user': self.admin_user['name']} + source = get_action('harvest_source_show')( + context, {'id': source_id_or_name}) + source_id = source['id'] else: source_id = None - context = {'model': model, 'session':model.Session, 'user': self.admin_user['name'], + context = {'model': model, 'session': model.Session, + 'user': self.admin_user['name'], 'join_datasets': not self.options.no_join_datasets, 'segments': self.options.segments} - objs_count = get_action('harvest_objects_import')(context,{ 'source_id': source_id, 'harvest_object_id': self.options.harvest_object_id, @@ -347,9 +467,16 @@ class Harvester(CkanCommand): def print_harvest_source(self, source): print 'Source id: %s' % source.get('id') + if 'name' in source: + # 'name' is only there if the source comes from the Package + print ' name: %s' % source.get('name') print ' url: %s' % source.get('url') - print ' type: %s' % source.get('type') - print ' active: %s' % (source.get('active', source.get('state') == 'active')) + # 'type' if source comes from HarvestSource, 'source_type' if it comes + # from the Package + print ' type: %s' % (source.get('source_type') or + source.get('type')) + print ' active: %s' % (source.get('active', + source.get('state') == 'active')) print 'frequency: %s' % source.get('frequency') print ' jobs: %s' % source.get('status').get('job_count') print '' diff --git a/ckanext/harvest/logic/action/create.py b/ckanext/harvest/logic/action/create.py index 2dd3555..93ac839 100644 --- a/ckanext/harvest/logic/action/create.py +++ b/ckanext/harvest/logic/action/create.py @@ -95,13 +95,16 @@ def harvest_job_create(context, data_dict): # Check if the source is active if not source.active: - log.warn('Harvest job cannot be created for inactive source %s', source_id) + log.warn('Harvest job cannot be created for inactive source %s', + source_id) raise Exception('Can not create jobs on inactive sources') - # Check if there already is an unrun or currently running job for this source + # Check if there already is an unrun or currently running job for this + # source exists = _check_for_existing_jobs(context, source_id) if exists: - log.warn('There is already an unrun job %r for this source %s', exists, source_id) + log.warn('There is already an unrun job %r for this source %s', + exists, source_id) raise HarvestJobExists('There already is an unrun job for this source') job = HarvestJob() diff --git a/ckanext/harvest/logic/action/get.py b/ckanext/harvest/logic/action/get.py index 2ddcdd9..4fbb5ac 100644 --- a/ckanext/harvest/logic/action/get.py +++ b/ckanext/harvest/logic/action/get.py @@ -20,7 +20,7 @@ from ckanext.harvest.logic.dictization import (harvest_source_dictize, log = logging.getLogger(__name__) @side_effect_free -def harvest_source_show(context,data_dict): +def harvest_source_show(context, data_dict): ''' Returns the metadata of a harvest source @@ -234,6 +234,11 @@ def harvest_job_report(context, data_dict): @side_effect_free def harvest_job_list(context,data_dict): + '''Returns a list of jobs and details of objects and errors. + + :param status: filter by e.g. "New" or "Finished" jobs + :param source_id: filter by a harvest source + ''' check_access('harvest_job_list',context,data_dict) @@ -315,6 +320,7 @@ def harvest_object_list(context,data_dict): @side_effect_free def harvesters_info_show(context,data_dict): + '''Returns details of the installed harvesters.''' check_access('harvesters_info_show',context,data_dict) diff --git a/ckanext/harvest/logic/action/update.py b/ckanext/harvest/logic/action/update.py index f13cb9c..763f4e3 100644 --- a/ckanext/harvest/logic/action/update.py +++ b/ckanext/harvest/logic/action/update.py @@ -83,10 +83,12 @@ def harvest_source_update(context,data_dict): return source -def harvest_source_clear(context,data_dict): + +def harvest_source_clear(context, data_dict): ''' - Clears all datasets, jobs and objects related to a harvest source, but keeps the source itself. - This is useful to clean history of long running harvest sources to start again fresh. + Clears all datasets, jobs and objects related to a harvest source, but + keeps the source itself. This is useful to clean history of long running + harvest sources to start again fresh. :param id: the id of the harvest source to clear :type id: string @@ -94,7 +96,7 @@ def harvest_source_clear(context,data_dict): ''' check_access('harvest_source_clear',context,data_dict) - harvest_source_id = data_dict.get('id',None) + harvest_source_id = data_dict.get('id', None) source = HarvestSource.get(harvest_source_id) if not source: @@ -182,6 +184,14 @@ def harvest_source_clear(context,data_dict): return {'id': harvest_source_id} def harvest_source_index_clear(context,data_dict): + ''' + Clears all datasets, jobs and objects related to a harvest source, but + keeps the source itself. This is useful to clean history of long running + harvest sources to start again fresh. + + :param id: the id of the harvest source to clear + :type id: string + ''' check_access('harvest_source_clear',context,data_dict) harvest_source_id = data_dict.get('id',None) @@ -208,14 +218,26 @@ def harvest_source_index_clear(context,data_dict): return {'id': harvest_source_id} -def harvest_objects_import(context,data_dict): + +def harvest_objects_import(context, data_dict): ''' - Reimports the current harvest objects - It performs the import stage with the last fetched objects, optionally - belonging to a certain source. - Please note that no objects will be fetched from the remote server. - It will only affect the last fetched objects already present in the - database. + Reimports the existing harvest objects, specified by either source_id, + harvest_object_id or package_id. + + It performs the import stage with the last fetched objects, optionally + belonging to a certain source. + + Please note that no objects will be fetched from the remote server. + + It will only affect the last fetched objects already present in the + database. + + :param source_id: the id of the harvest source to import + :type source_id: string + :param harvest_object_id: the id of the harvest object to import + :type harvest_object_id: string + :param package_id: the id or name of the package to import + :type package_id: string ''' log.info('Harvest objects import: %r', data_dict) check_access('harvest_objects_import',context,data_dict) @@ -426,6 +448,64 @@ def harvest_send_job_to_gather_queue(context, data_dict): return harvest_job_dictize(job_obj, context) +def harvest_job_abort(context, data_dict): + ''' + Aborts a harvest job. Given a harvest source_id, it looks for the latest + one and (assuming it not already Finished) marks it as Finished. It also + marks any of that source's harvest objects and (if not complete or error) + marks them "ERROR", so any left in limbo are cleaned up. Does not actually + stop running any queued harvest fetchs/objects. + + :param source_id: the name or id of the harvest source with a job to abort + :type source_id: string + ''' + + check_access('harvest_job_abort', context, data_dict) + + model = context['model'] + + source_id = data_dict.get('source_id', None) + source = harvest_source_show(context, {'id': source_id}) + + # HarvestJob set status to 'Finished' + # Don not use harvest_job_list since it can use a lot of memory + last_job = model.Session.query(HarvestJob) \ + .filter_by(source_id=source['id']) \ + .order_by(HarvestJob.created.desc()).first() + if not last_job: + raise NotFound('Error: source has no jobs') + job = get_action('harvest_job_show')(context, + {'id': last_job.id}) + + if job['status'] != 'Finished': + # i.e. New or Running + job_obj = HarvestJob.get(job['id']) + job_obj.status = new_status = 'Finished' + model.repo.commit_and_remove() + log.info('Harvest job changed status from "%s" to "%s"', + job['status'], new_status) + else: + log.info('Harvest job unchanged. Source %s status is: "%s"', + job['id'], job['status']) + + # HarvestObjects set to ERROR + job_obj = HarvestJob.get(job['id']) + objs = job_obj.objects + for obj in objs: + if obj.state not in ('COMPLETE', 'ERROR'): + old_state = obj.state + obj.state = 'ERROR' + log.info('Harvest object changed state from "%s" to "%s": %s', + old_state, obj.state, obj.id) + else: + log.info('Harvest object not changed from "%s": %s', + obj.state, obj.id) + model.repo.commit_and_remove() + + job_obj = HarvestJob.get(job['id']) + return harvest_job_dictize(job_obj, context) + + @logic.side_effect_free def harvest_sources_reindex(context, data_dict): ''' @@ -463,7 +543,8 @@ def harvest_source_reindex(context, data_dict): context.update({'ignore_auth': True}) package_dict = logic.get_action('harvest_source_show')(context, {'id': harvest_source_id}) - log.debug('Updating search index for harvest source {0}'.format(harvest_source_id)) + log.debug('Updating search index for harvest source: {0}'.format( + package_dict.get('name') or harvest_source_id)) # Remove configuration values new_dict = {} diff --git a/ckanext/harvest/logic/auth/update.py b/ckanext/harvest/logic/auth/update.py index 586149f..ae6c0fd 100644 --- a/ckanext/harvest/logic/auth/update.py +++ b/ckanext/harvest/logic/auth/update.py @@ -70,6 +70,15 @@ def harvest_send_job_to_gather_queue(context, data_dict): return harvest_job_create(context, data_dict) +def harvest_job_abort(context, data_dict): + ''' + Authorization check for aborting a running harvest job + + Same permissions as running one + ''' + return harvest_jobs_run(context, data_dict) + + def harvest_sources_reindex(context, data_dict): ''' Authorization check for reindexing all harvest sources diff --git a/ckanext/harvest/model/__init__.py b/ckanext/harvest/model/__init__.py index 077e892..4a30a54 100644 --- a/ckanext/harvest/model/__init__.py +++ b/ckanext/harvest/model/__init__.py @@ -118,7 +118,13 @@ class HarvestSource(HarvestDomainObject): or "inactive". The harvesting processes are not fired on inactive sources. ''' - pass + def __repr__(self): + return '' % \ + (self.id, self.title, self.url, self.active) + + def __str__(self): + return self.__repr__().encode('ascii', 'ignore') + class HarvestJob(HarvestDomainObject): '''A Harvesting Job is performed in two phases. In first place, the diff --git a/ckanext/harvest/queue.py b/ckanext/harvest/queue.py index 5fc6de8..5d063f1 100644 --- a/ckanext/harvest/queue.py +++ b/ckanext/harvest/queue.py @@ -14,8 +14,9 @@ from ckanext.harvest.interfaces import IHarvester log = logging.getLogger(__name__) assert not log.disabled -__all__ = ['get_gather_publisher', 'get_gather_consumer', \ - 'get_fetch_publisher', 'get_fetch_consumer'] +__all__ = ['get_gather_publisher', 'get_gather_consumer', + 'get_fetch_publisher', 'get_fetch_consumer', + 'get_harvester'] PORT = 5672 USERID = 'guest' @@ -230,42 +231,38 @@ def gather_callback(channel, method, header, body): # Send the harvest job to the plugins that implement # the Harvester interface, only if the source type # matches - harvester_found = False - for harvester in PluginImplementations(IHarvester): - if harvester.info()['name'] == job.source.type: - harvester_found = True + harvester = get_harvester(job.source.type) - try: - harvest_object_ids = gather_stage(harvester, job) - except (Exception, KeyboardInterrupt): - channel.basic_ack(method.delivery_tag) - raise + if harvester: + try: + harvest_object_ids = gather_stage(harvester, job) + except (Exception, KeyboardInterrupt): + channel.basic_ack(method.delivery_tag) + raise - if not isinstance(harvest_object_ids, list): - log.error('Gather stage failed') - publisher.close() - channel.basic_ack(method.delivery_tag) - return False + if not isinstance(harvest_object_ids, list): + log.error('Gather stage failed') + publisher.close() + channel.basic_ack(method.delivery_tag) + return False - if len(harvest_object_ids) == 0: - log.info('No harvest objects to fetch') - publisher.close() - channel.basic_ack(method.delivery_tag) - return False + if len(harvest_object_ids) == 0: + log.info('No harvest objects to fetch') + publisher.close() + channel.basic_ack(method.delivery_tag) + return False - log.debug('Received from plugin gather_stage: {0} objects (first: {1} last: {2})'.format( - len(harvest_object_ids), harvest_object_ids[:1], harvest_object_ids[-1:])) - for id in harvest_object_ids: - # Send the id to the fetch queue - publisher.send({'harvest_object_id':id}) - log.debug('Sent {0} objects to the fetch queue'.format(len(harvest_object_ids))) + log.debug('Received from plugin gather_stage: {0} objects (first: {1} last: {2})'.format( + len(harvest_object_ids), harvest_object_ids[:1], harvest_object_ids[-1:])) + for id in harvest_object_ids: + # Send the id to the fetch queue + publisher.send({'harvest_object_id':id}) + log.debug('Sent {0} objects to the fetch queue'.format(len(harvest_object_ids))) - if not harvester_found: + else: # This can occur if you: - # * remove a harvester and it still has sources that are then - # refreshed - # * add a new harvester and restart CKAN but not the gather - # queue. + # * remove a harvester and it still has sources that are then refreshed + # * add a new harvester and restart CKAN but not the gather queue. msg = 'System error - No harvester could be found for source type %s' % job.source.type err = HarvestGatherError(message=msg,job=job) err.save() @@ -276,6 +273,12 @@ def gather_callback(channel, method, header, body): channel.basic_ack(method.delivery_tag) +def get_harvester(harvest_source_type): + for harvester in PluginImplementations(IHarvester): + if harvester.info()['name'] == harvest_source_type: + return harvester + + def gather_stage(harvester, job): '''Calls the harvester's gather_stage, returning harvest object ids, with some error handling.