diff --git a/README.rst b/README.rst index b3263d3..da72490 100644 --- a/README.rst +++ b/README.rst @@ -341,11 +341,10 @@ following methods:: ''' implements(IHarvester) - def info(self): ''' - Harvesting implementations must provide this method, which will return a - dictionary containing different descriptors of the harvester. The + Harvesting implementations must provide this method, which will return + a dictionary containing different descriptors of the harvester. The returned dictionary should contain: * name: machine-readable name. This will be the value stored in the @@ -353,8 +352,8 @@ following methods:: harvester. * title: human-readable name. This will appear in the form's select box in the WUI. - * description: a small description of what the harvester does. This will - appear on the form as a guidance to the user. + * description: a small description of what the harvester does. This + will appear on the form as a guidance to the user. A complete example may be:: @@ -373,9 +372,10 @@ following methods:: [optional] - Harvesters can provide this method to validate the configuration entered in the - form. It should return a single string, which will be stored in the database. - Exceptions raised will be shown in the form's error messages. + Harvesters can provide this method to validate the configuration + entered in the form. It should return a single string, which will be + stored in the database. Exceptions raised will be shown in the form's + error messages. :param harvest_object_id: Config string coming from the form :returns: A string with the validated configuration options @@ -406,7 +406,7 @@ following methods:: def gather_stage(self, harvest_job): ''' - The gather stage will recieve a HarvestJob object and will be + The gather stage will receive a HarvestJob object and will be responsible for: - gathering all the necessary objects to fetch on a later. stage (e.g. for a CSW server, perform a GetRecords request) @@ -418,6 +418,8 @@ following methods:: - creating and storing any suitable HarvestGatherErrors that may occur. - returning a list with all the ids of the created HarvestObjects. + - to abort the harvest, create a HarvestGatherError and raise an + exception. Any created HarvestObjects will be deleted. :param harvest_job: HarvestJob object :returns: A list of HarvestObject ids @@ -432,10 +434,14 @@ following methods:: - saving the content in the provided HarvestObject. - creating and storing any suitable HarvestObjectErrors that may occur. - - returning True if everything went as expected, False otherwise. + - returning True if everything is ok (ie the object should now be + imported), "unchanged" if the object didn't need harvesting after + all (ie no error, but don't continue to import stage) or False if + there were errors. :param harvest_object: HarvestObject object - :returns: True if everything went right, False if errors were found + :returns: True if successful, 'unchanged' if nothing to import after + all, False if not successful ''' def import_stage(self, harvest_object): @@ -454,12 +460,15 @@ following methods:: objects of this harvest source if the action was successful. - creating and storing any suitable HarvestObjectErrors that may occur. - - returning True if the action was done, "unchanged" if nothing - was needed doing after all or False if there were errors. + - creating the HarvestObject - Package relation (if necessary) + - returning True if the action was done, "unchanged" if the object + didn't need harvesting after all or False if there were errors. + + NB You can run this stage repeatedly using 'paster harvest import'. :param harvest_object: HarvestObject object - :returns: True if the action was done, "unchanged" if nothing was - needed doing after all and False if something went wrong. + :returns: True if the action was done, "unchanged" if the object didn't + need harvesting after all or False if there were errors. ''' diff --git a/ckanext/harvest/commands/harvester.py b/ckanext/harvest/commands/harvester.py index 1466c66..39be5ce 100644 --- a/ckanext/harvest/commands/harvester.py +++ b/ckanext/harvest/commands/harvester.py @@ -38,7 +38,7 @@ class Harvester(CkanCommand): harvester jobs - lists harvest jobs - harvester job_abort {source-id/name} + harvester job_abort {source-id/source-name/obj-id} - marks a job as "Aborted" so that the source can be restarted afresh. It ensures that the job's harvest objects status are also marked finished. You should ensure that neither the job nor its objects are @@ -67,7 +67,7 @@ class Harvester(CkanCommand): WARNING: if using Redis, this command purges all data in the current Redis database - harvester [-j] [-o] [--segments={segments}] import [{source-id}] + harvester [-j] [-o|-g|-p {id/guid}] [--segments={segments}] import [{source-id}] - perform the import stage with the last fetched objects, for a certain source or a single harvest object. Please note that no objects will be fetched from the remote server. It will only affect the objects @@ -75,6 +75,7 @@ class Harvester(CkanCommand): To import a particular harvest source, specify its id as an argument. To import a particular harvest object use the -o option. + To import a particular guid use the -g option. To import a particular package use the -p option. You will need to specify the -j flag in cases where the datasets are @@ -111,10 +112,13 @@ class Harvester(CkanCommand): action='store_true', default=False, help='Do not join harvest objects to existing datasets') self.parser.add_option('-o', '--harvest-object-id', dest='harvest_object_id', - default=False, help='Id of the harvest object to which perfom the import stage') + default=False, help='Id of the harvest object to which perform the import stage') self.parser.add_option('-p', '--package-id', dest='package_id', - default=False, help='Id of the package whose harvest object to perfom the import stage for') + default=False, help='Id of the package whose harvest object to perform the import stage for') + + self.parser.add_option('-g', '--guid', dest='guid', + default=False, help='Guid of the harvest object to which perform the import stage for') self.parser.add_option('--segments', dest='segments', default=False, help= @@ -358,19 +362,15 @@ class Harvester(CkanCommand): def job_abort(self): if len(self.args) >= 2: - source_id_or_name = unicode(self.args[1]) + job_or_source_id_or_name = unicode(self.args[1]) else: - print 'Please provide a source id' + print 'Please provide a job id or source name/id' sys.exit(1) - context = {'model': model, 'session': model.Session, - 'user': self.admin_user['name']} - source = get_action('harvest_source_show')( - context, {'id': source_id_or_name}) context = {'model': model, 'user': self.admin_user['name'], 'session': model.Session} - job = get_action('harvest_job_abort')(context, - {'source_id': source['id']}) + job = get_action('harvest_job_abort')( + context, {'id': job_or_source_id_or_name}) print 'Job status: {0}'.format(job['status']) def run_harvester(self): @@ -445,6 +445,7 @@ class Harvester(CkanCommand): 'source_id': source_id, 'harvest_object_id': self.options.harvest_object_id, 'package_id': self.options.package_id, + 'guid': self.options.guid, }) print '%s objects reimported' % objs_count diff --git a/ckanext/harvest/controllers/view.py b/ckanext/harvest/controllers/view.py index b01e04b..659727d 100644 --- a/ckanext/harvest/controllers/view.py +++ b/ckanext/harvest/controllers/view.py @@ -17,6 +17,7 @@ import ckan.lib.helpers as h, json from ckan.lib.base import BaseController, c, \ request, response, render, abort, redirect +from ckanext.harvest.logic import HarvestJobExists, HarvestSourceInactiveError from ckanext.harvest.plugin import DATASET_TYPE_NAME import logging @@ -62,13 +63,14 @@ class ViewController(BaseController): abort(404,_('Harvest source not found')) except p.toolkit.NotAuthorized: abort(401,self.not_auth_message) + except HarvestSourceInactiveError, e: + h.flash_error(_('Cannot create new harvest jobs on inactive ' + 'sources. First, please change the source status ' + 'to \'active\'.')) + except HarvestJobExists, e: + h.flash_notice(_('A harvest job has already been scheduled for ' + 'this source')) except Exception, e: - if 'Can not create jobs on inactive sources' in str(e): - h.flash_error(_('Cannot create new harvest jobs on inactive sources.' - + ' First, please change the source status to \'active\'.')) - elif 'There already is an unrun job for this source' in str(e): - h.flash_notice(_('A harvest job has already been scheduled for this source')) - else: msg = 'An error occurred: [%s]' % str(e) h.flash_error(msg) diff --git a/ckanext/harvest/harvesters/base.py b/ckanext/harvest/harvesters/base.py index 6f13f53..90a1b14 100644 --- a/ckanext/harvest/harvesters/base.py +++ b/ckanext/harvest/harvesters/base.py @@ -2,8 +2,7 @@ import logging import re import uuid -from sqlalchemy.sql import update,and_, bindparam -from sqlalchemy.exc import InvalidRequestError +from sqlalchemy.sql import update, bindparam from pylons import config from ckan import plugins as p @@ -12,28 +11,45 @@ from ckan.model import Session, Package, PACKAGE_NAME_MAX_LENGTH from ckan.logic import ValidationError, NotFound, get_action from ckan.logic.schema import default_create_package_schema -from ckan.lib.navl.validators import ignore_missing,ignore -from ckan.lib.munge import munge_title_to_name,substitute_ascii_equivalents +from ckan.lib.navl.validators import ignore_missing, ignore +from ckan.lib.munge import munge_title_to_name, substitute_ascii_equivalents -from ckanext.harvest.model import HarvestJob, HarvestObject, HarvestGatherError, \ - HarvestObjectError +from ckanext.harvest.model import (HarvestObject, HarvestGatherError, + HarvestObjectError) from ckan.plugins.core import SingletonPlugin, implements from ckanext.harvest.interfaces import IHarvester +if p.toolkit.check_ckan_version(min_version='2.3'): + from ckan.lib.munge import munge_tag +else: + # Fallback munge_tag for older ckan versions which don't have a decent + # munger + def _munge_to_length(string, min_length, max_length): + '''Pad/truncates a string''' + if len(string) < min_length: + string += '_' * (min_length - len(string)) + if len(string) > max_length: + string = string[:max_length] + return string + + def munge_tag(tag): + tag = substitute_ascii_equivalents(tag) + tag = tag.lower().strip() + tag = re.sub(r'[^a-zA-Z0-9\- ]', '', tag).replace(' ', '-') + tag = _munge_to_length(tag, model.MIN_TAG_LENGTH, model.MAX_TAG_LENGTH) + return tag log = logging.getLogger(__name__) -def munge_tag(tag): - tag = substitute_ascii_equivalents(tag) - tag = tag.lower().strip() - return re.sub(r'[^a-zA-Z0-9 -]', '', tag).replace(' ', '-') - - class HarvesterBase(SingletonPlugin): ''' - Generic class for harvesters with helper functions + Generic base class for harvesters, providing a number of useful functions. + + A harvester doesn't have to derive from this - it could just have: + + implements(IHarvester) ''' implements(IHarvester) @@ -136,31 +152,8 @@ class HarvesterBase(SingletonPlugin): return ideal_name[:PACKAGE_NAME_MAX_LENGTH-APPEND_MAX_CHARS] + \ str(uuid.uuid4())[:APPEND_MAX_CHARS] - - def _save_gather_error(self, message, job): - err = HarvestGatherError(message=message, job=job) - try: - err.save() - except InvalidRequestError: - Session.rollback() - err.save() - finally: - log.error(message) - - - def _save_object_error(self, message, obj, stage=u'Fetch', line=None): - err = HarvestObjectError(message=message, - object=obj, - stage=stage, - line=line) - try: - err.save() - except InvalidRequestError, e: - Session.rollback() - err.save() - finally: - log_message = '{0}, line {1}'.format(message,line) if line else message - log.debug(log_message) + _save_gather_error = HarvestGatherError.create + _save_object_error = HarvestObjectError.create def _get_user_name(self): ''' diff --git a/i18n/ckanext-harvest.pot b/ckanext/harvest/i18n/ckanext-harvest.pot similarity index 100% rename from i18n/ckanext-harvest.pot rename to ckanext/harvest/i18n/ckanext-harvest.pot diff --git a/i18n/sv/LC_MESSAGES/ckanext-harvest.mo b/ckanext/harvest/i18n/sv/LC_MESSAGES/ckanext-harvest.mo similarity index 100% rename from i18n/sv/LC_MESSAGES/ckanext-harvest.mo rename to ckanext/harvest/i18n/sv/LC_MESSAGES/ckanext-harvest.mo diff --git a/i18n/sv/LC_MESSAGES/ckanext-harvest.po b/ckanext/harvest/i18n/sv/LC_MESSAGES/ckanext-harvest.po similarity index 100% rename from i18n/sv/LC_MESSAGES/ckanext-harvest.po rename to ckanext/harvest/i18n/sv/LC_MESSAGES/ckanext-harvest.po diff --git a/ckanext/harvest/interfaces.py b/ckanext/harvest/interfaces.py index 99022ca..56ed790 100644 --- a/ckanext/harvest/interfaces.py +++ b/ckanext/harvest/interfaces.py @@ -1,5 +1,6 @@ from ckan.plugins.interfaces import Interface + class IHarvester(Interface): ''' Common harvesting interface @@ -8,8 +9,8 @@ class IHarvester(Interface): def info(self): ''' - Harvesting implementations must provide this method, which will return a - dictionary containing different descriptors of the harvester. The + Harvesting implementations must provide this method, which will return + a dictionary containing different descriptors of the harvester. The returned dictionary should contain: * name: machine-readable name. This will be the value stored in the @@ -17,8 +18,8 @@ class IHarvester(Interface): harvester. * title: human-readable name. This will appear in the form's select box in the WUI. - * description: a small description of what the harvester does. This will - appear on the form as a guidance to the user. + * description: a small description of what the harvester does. This + will appear on the form as a guidance to the user. A complete example may be:: @@ -37,9 +38,10 @@ class IHarvester(Interface): [optional] - Harvesters can provide this method to validate the configuration entered in the - form. It should return a single string, which will be stored in the database. - Exceptions raised will be shown in the form's error messages. + Harvesters can provide this method to validate the configuration + entered in the form. It should return a single string, which will be + stored in the database. Exceptions raised will be shown in the form's + error messages. :param harvest_object_id: Config string coming from the form :returns: A string with the validated configuration options @@ -70,7 +72,7 @@ class IHarvester(Interface): def gather_stage(self, harvest_job): ''' - The gather stage will recieve a HarvestJob object and will be + The gather stage will receive a HarvestJob object and will be responsible for: - gathering all the necessary objects to fetch on a later. stage (e.g. for a CSW server, perform a GetRecords request) @@ -82,6 +84,8 @@ class IHarvester(Interface): - creating and storing any suitable HarvestGatherErrors that may occur. - returning a list with all the ids of the created HarvestObjects. + - to abort the harvest, create a HarvestGatherError and raise an + exception. Any created HarvestObjects will be deleted. :param harvest_job: HarvestJob object :returns: A list of HarvestObject ids @@ -96,10 +100,14 @@ class IHarvester(Interface): - saving the content in the provided HarvestObject. - creating and storing any suitable HarvestObjectErrors that may occur. - - returning True if everything went as expected, False otherwise. + - returning True if everything is ok (ie the object should now be + imported), "unchanged" if the object didn't need harvesting after + all (ie no error, but don't continue to import stage) or False if + there were errors. :param harvest_object: HarvestObject object - :returns: True if everything went right, False if errors were found + :returns: True if successful, 'unchanged' if nothing to import after + all, False if not successful ''' def import_stage(self, harvest_object): @@ -118,10 +126,13 @@ class IHarvester(Interface): objects of this harvest source if the action was successful. - creating and storing any suitable HarvestObjectErrors that may occur. - - returning True if the action was done, "unchanged" if nothing - was needed doing after all or False if there were errors. + - creating the HarvestObject - Package relation (if necessary) + - returning True if the action was done, "unchanged" if the object + didn't need harvesting after all or False if there were errors. + + NB You can run this stage repeatedly using 'paster harvest import'. :param harvest_object: HarvestObject object - :returns: True if the action was done, "unchanged" if nothing was - needed doing after all and False if something went wrong. + :returns: True if the action was done, "unchanged" if the object didn't + need harvesting after all or False if there were errors. ''' diff --git a/ckanext/harvest/logic/__init__.py b/ckanext/harvest/logic/__init__.py index 98f60c8..4c00db2 100644 --- a/ckanext/harvest/logic/__init__.py +++ b/ckanext/harvest/logic/__init__.py @@ -8,3 +8,7 @@ except ImportError: class HarvestJobExists(Exception): pass + + +class HarvestSourceInactiveError(Exception): + pass diff --git a/ckanext/harvest/logic/action/create.py b/ckanext/harvest/logic/action/create.py index c71cad0..26c5d01 100644 --- a/ckanext/harvest/logic/action/create.py +++ b/ckanext/harvest/logic/action/create.py @@ -4,15 +4,15 @@ import ckan from ckan.plugins import toolkit -from ckanext.harvest.logic import HarvestJobExists +from ckanext.harvest.logic import HarvestJobExists, HarvestSourceInactiveError from ckanext.harvest.plugin import DATASET_TYPE_NAME from ckanext.harvest.model import (HarvestSource, HarvestJob, HarvestObject, - HarvestObjectExtra) + HarvestObjectExtra) from ckanext.harvest.logic.dictization import (harvest_job_dictize, - harvest_object_dictize) -from ckanext.harvest.logic.schema import (harvest_source_show_package_schema, - harvest_object_create_schema) -from ckanext.harvest.logic.action.get import harvest_source_list,harvest_job_list + harvest_object_dictize) +from ckanext.harvest.logic.schema import harvest_object_create_schema +from ckanext.harvest.logic.action.get import (harvest_source_list, + harvest_job_list) log = logging.getLogger(__name__) @@ -23,7 +23,8 @@ check_access = toolkit.check_access class InactiveSource(Exception): pass -def harvest_source_create(context,data_dict): + +def harvest_source_create(context, data_dict): ''' Creates a new harvest source @@ -76,8 +77,8 @@ def harvest_job_create(context, data_dict): Creates a Harvest Job for a Harvest Source and runs it (by putting it on the gather queue) - :param source_id: - :type param: string + :param source_id: id of the harvest source to create a job for + :type source_id: string :param run: whether to also run it or not (default: True) :type run: bool ''' @@ -97,7 +98,7 @@ def harvest_job_create(context, data_dict): if not source.active: log.warn('Harvest job cannot be created for inactive source %s', source_id) - raise Exception('Can not create jobs on inactive sources') + raise HarvestSourceInactiveError('Can not create jobs on inactive sources') # Check if there already is an unrun or currently running job for this # source @@ -131,20 +132,21 @@ def harvest_job_create_all(context, data_dict): ''' log.info('Harvest job create all: %r', data_dict) - check_access('harvest_job_create_all',context,data_dict) + check_access('harvest_job_create_all', context, data_dict) run = data_dict.get('run', True) - data_dict.update({'only_active':True}) + data_dict.update({'only_active': True}) # Get all active sources - sources = harvest_source_list(context,data_dict) + sources = harvest_source_list(context, data_dict) jobs = [] # Create a new job for each, if there isn't already one for source in sources: exists = _check_for_existing_jobs(context, source['id']) if exists: - log.info('Skipping source %s as it already has a pending job', source['id']) + log.info('Skipping source %s as it already has a pending job', + source['id']) continue job = harvest_job_create( @@ -155,6 +157,7 @@ def harvest_job_create_all(context, data_dict): 'and run ' if run else '', len(jobs)) return jobs + def _check_for_existing_jobs(context, source_id): ''' Given a source id, checks if there are jobs for this source @@ -162,20 +165,21 @@ def _check_for_existing_jobs(context, source_id): rtype: boolean ''' - data_dict ={ - 'source_id':source_id, - 'status':u'New' + data_dict = { + 'source_id': source_id, + 'status': u'New' } - exist_new = harvest_job_list(context,data_dict) - data_dict ={ - 'source_id':source_id, - 'status':u'Running' + exist_new = harvest_job_list(context, data_dict) + data_dict = { + 'source_id': source_id, + 'status': u'Running' } - exist_running = harvest_job_list(context,data_dict) + exist_running = harvest_job_list(context, data_dict) exist = len(exist_new + exist_running) > 0 return exist + def harvest_object_create(context, data_dict): ''' Create a new harvest object @@ -187,7 +191,8 @@ def harvest_object_create(context, data_dict): :type extras: dict (optional) ''' check_access('harvest_object_create', context, data_dict) - data, errors = _validate(data_dict, harvest_object_create_schema(), context) + data, errors = _validate(data_dict, harvest_object_create_schema(), + context) if errors: raise toolkit.ValidationError(errors) @@ -195,11 +200,11 @@ def harvest_object_create(context, data_dict): obj = HarvestObject( guid=data.get('guid'), content=data.get('content'), - job=data['job_id'], + job=data['job_id'], # which was validated into a HarvestJob object harvest_source_id=data.get('source_id'), package_id=data.get('package_id'), - extras=[ HarvestObjectExtra(key=k, value=v) - for k, v in data.get('extras', {}).items() ] + extras=[HarvestObjectExtra(key=k, value=v) + for k, v in data.get('extras', {}).items()] ) obj.save() diff --git a/ckanext/harvest/logic/action/get.py b/ckanext/harvest/logic/action/get.py index 4fbb5ac..77a2de6 100644 --- a/ckanext/harvest/logic/action/get.py +++ b/ckanext/harvest/logic/action/get.py @@ -128,33 +128,8 @@ def harvest_source_list(context, data_dict): sources = _get_sources_for_user(context, data_dict) - context.update({'detailed':False}) return [harvest_source_dictize(source, context) for source in sources] -@side_effect_free -def harvest_source_for_a_dataset(context, data_dict): - ''' - TODO: Deprecated, harvest source id is added as an extra to each dataset - automatically - ''' - '''For a given dataset, return the harvest source that - created or last updated it, otherwise NotFound.''' - - model = context['model'] - session = context['session'] - - dataset_id = data_dict.get('id') - - query = session.query(HarvestSource)\ - .join(HarvestObject)\ - .filter_by(package_id=dataset_id)\ - .order_by(HarvestObject.gathered.desc()) - source = query.first() # newest - - if not source: - raise NotFound - - return harvest_source_dictize(source,context) @side_effect_free def harvest_job_show(context,data_dict): diff --git a/ckanext/harvest/logic/action/update.py b/ckanext/harvest/logic/action/update.py index afeb131..6088ae5 100644 --- a/ckanext/harvest/logic/action/update.py +++ b/ckanext/harvest/logic/action/update.py @@ -262,6 +262,8 @@ def harvest_objects_import(context, data_dict): :param source_id: the id of the harvest source to import :type source_id: string + :param guid: the guid of the harvest object to import + :type guid: string :param harvest_object_id: the id of the harvest object to import :type harvest_object_id: string :param package_id: the id or name of the package to import @@ -273,6 +275,7 @@ def harvest_objects_import(context, data_dict): model = context['model'] session = context['session'] source_id = data_dict.get('source_id') + guid = data_dict.get('guid') harvest_object_id = data_dict.get('harvest_object_id') package_id_or_name = data_dict.get('package_id') @@ -280,7 +283,13 @@ def harvest_objects_import(context, data_dict): join_datasets = context.get('join_datasets', True) - if source_id: + if guid: + last_objects_ids = \ + session.query(HarvestObject.id) \ + .filter(HarvestObject.guid == guid) \ + .filter(HarvestObject.current == True) + + elif source_id: source = HarvestSource.get(source_id) if not source: log.error('Harvest source %s does not exist', source_id) @@ -462,18 +471,17 @@ def harvest_send_job_to_gather_queue(context, data_dict): :type id: string ''' log.info('Send job to gather queue: %r', data_dict) - check_access('harvest_send_job_to_gather_queue', context, data_dict) job_id = logic.get_or_bust(data_dict, 'id') + job = toolkit.get_action('harvest_job_show')( + context, {'id': job_id}) + + check_access('harvest_send_job_to_gather_queue', context, job) # gather queue publisher = get_gather_publisher() - job = logic.get_action('harvest_job_show')( - context, {'id': job_id}) - # Check the source is active - context['detailed'] = False source = harvest_source_show(context, {'id': job['source_id']}) if not source['active']: raise toolkit.ValidationError('Source is not active') @@ -495,6 +503,11 @@ def harvest_job_abort(context, data_dict): marks them "ERROR", so any left in limbo are cleaned up. Does not actually stop running any queued harvest fetchs/objects. + Specify either id or source_id. + + :param id: the job id to abort, or the id or name of the harvest source + with a job to abort + :type id: string :param source_id: the name or id of the harvest source with a job to abort :type source_id: string ''' @@ -503,18 +516,25 @@ def harvest_job_abort(context, data_dict): model = context['model'] - source_id = data_dict.get('source_id') - source = harvest_source_show(context, {'id': source_id}) - - # HarvestJob set status to 'Finished' - # Don not use harvest_job_list since it can use a lot of memory - last_job = model.Session.query(HarvestJob) \ - .filter_by(source_id=source['id']) \ - .order_by(HarvestJob.created.desc()).first() - if not last_job: - raise NotFound('Error: source has no jobs') - job = get_action('harvest_job_show')(context, - {'id': last_job.id}) + source_or_job_id = data_dict.get('source_id') or data_dict.get('id') + if source_or_job_id: + try: + source = harvest_source_show(context, {'id': source_or_job_id}) + except NotFound: + job = get_action('harvest_job_show')( + context, {'id': source_or_job_id}) + else: + # HarvestJob set status to 'Aborted' + # Do not use harvest_job_list since it can use a lot of memory + # Get the most recent job for the source + job = model.Session.query(HarvestJob) \ + .filter_by(source_id=source['id']) \ + .order_by(HarvestJob.created.desc()).first() + if not job: + raise NotFound('Error: source has no jobs') + job_id = job.id + job = get_action('harvest_job_show')( + context, {'id': job_id}) if job['status'] != 'Finished': # i.e. New or Running diff --git a/ckanext/harvest/logic/auth/update.py b/ckanext/harvest/logic/auth/update.py index ae6c0fd..2bd70b9 100644 --- a/ckanext/harvest/logic/auth/update.py +++ b/ckanext/harvest/logic/auth/update.py @@ -66,7 +66,7 @@ def harvest_send_job_to_gather_queue(context, data_dict): It forwards the checks to harvest_job_create, ie the user can only run the job if she is allowed to create the job. ''' - from ckanext.harvest.auth.create import harvest_job_create + from ckanext.harvest.logic.auth.create import harvest_job_create return harvest_job_create(context, data_dict) diff --git a/ckanext/harvest/logic/dictization.py b/ckanext/harvest/logic/dictization.py index 3cb8de1..9c102ee 100644 --- a/ckanext/harvest/logic/dictization.py +++ b/ckanext/harvest/logic/dictization.py @@ -104,7 +104,6 @@ def _get_source_status(source, context): ''' model = context.get('model') - detailed = context.get('detailed', True) out = dict() @@ -114,11 +113,8 @@ def _get_source_status(source, context): 'job_count': 0, 'next_harvest': '', 'last_harvest_request': '', - 'last_harvest_statistics': - {'added': 0, 'updated': 0, 'errors': 0, 'deleted': 0}, - 'last_harvest_errors': {'gather': [], 'object': []}, 'overall_statistics': {'added': 0, 'errors': 0}, - 'packages': []} + } if not job_count: out['msg'] = 'No jobs yet' @@ -141,32 +137,6 @@ def _get_source_status(source, context): #TODO: Should we encode the dates as strings? out['last_harvest_request'] = str(last_job.gather_finished) - if detailed: - harvest_job_dict = harvest_job_dictize(last_job, context) - # No packages added or updated - statistics = out['last_harvest_statistics'] - statistics['added'] = harvest_job_dict['stats'].get('new', 0) - statistics['updated'] = harvest_job_dict['stats'].get('updated', 0) - statistics['deleted'] = harvest_job_dict['stats'].get('deleted', 0) - statistics['errors'] = ( - harvest_job_dict['stats'].get('errored', 0) + - len(last_job.gather_errors)) - - if detailed: - # We have the gathering errors in last_job.gather_errors, so let's - # also get also the object errors. - object_errors = model.Session.query(HarvestObjectError)\ - .join(HarvestObject) \ - .filter(HarvestObject.job == last_job) - for gather_error in last_job.gather_errors: - out['last_harvest_errors']['gather'].append(gather_error.message) - - for object_error in object_errors: - err = {'object_id': object_error.object.id, - 'object_guid': object_error.object.guid, - 'message': object_error.message} - out['last_harvest_errors']['object'].append(err) - # Overall statistics packages = model.Session.query(distinct(HarvestObject.package_id), Package.name) \ @@ -176,9 +146,6 @@ def _get_source_status(source, context): .filter(Package.state == u'active') out['overall_statistics']['added'] = packages.count() - if detailed: - for package in packages: - out['packages'].append(package.name) gather_errors = model.Session.query(HarvestGatherError) \ .join(HarvestJob).join(HarvestSource) \ diff --git a/ckanext/harvest/logic/validators.py b/ckanext/harvest/logic/validators.py index d12be67..e41c91c 100644 --- a/ckanext/harvest/logic/validators.py +++ b/ckanext/harvest/logic/validators.py @@ -34,6 +34,9 @@ def harvest_job_exists(value, context): def _normalize_url(url): + '''Strips off parameters off a URL, and an unnecessary port number, so that + simple variations on a URL are ignored, to used to help avoid getting two + harvesters for the same URL.''' o = urlparse.urlparse(url) # Normalize port @@ -118,8 +121,8 @@ def harvest_source_type_exists(value, context): available_types.append(info['name']) if not value in available_types: - raise Invalid('Unknown harvester type: %s. Have you registered a ' - 'harvester for this type?' % value) + raise Invalid('Unknown harvester type: %s. Registered types: %r' % + (value, available_types)) return value diff --git a/ckanext/harvest/model/__init__.py b/ckanext/harvest/model/__init__.py index 5a813a5..a87eb58 100644 --- a/ckanext/harvest/model/__init__.py +++ b/ckanext/harvest/model/__init__.py @@ -10,6 +10,7 @@ from sqlalchemy import ForeignKey from sqlalchemy import types from sqlalchemy.engine.reflection import Inspector from sqlalchemy.orm import backref, relation +from sqlalchemy.exc import InvalidRequestError from ckan import model from ckan import logic @@ -153,13 +154,43 @@ class HarvestGatherError(HarvestDomainObject): '''Gather errors are raised during the **gather** stage of a harvesting job. ''' - pass + @classmethod + def create(cls, message, job): + ''' + Helper function to create an error object and save it. + ''' + err = cls(message=message, job=job) + try: + err.save() + except InvalidRequestError: + Session.rollback() + err.save() + finally: + # No need to alert administrator so don't log as an error + log.info(message) + class HarvestObjectError(HarvestDomainObject): '''Object errors are raised during the **fetch** or **import** stage of a harvesting job, and are referenced to a specific harvest object. ''' - pass + @classmethod + def create(cls, message, object, stage=u'Fetch', line=None): + ''' + Helper function to create an error object and save it. + ''' + err = cls(message=message, object=object, + stage=stage, line=line) + try: + err.save() + except InvalidRequestError: + Session.rollback() + err.save() + finally: + log_message = '{0}, line {1}'.format(message, line) \ + if line else message + log.debug(log_message) + def harvest_object_before_insert_listener(mapper,connection,target): ''' diff --git a/ckanext/harvest/plugin.py b/ckanext/harvest/plugin.py index a8489de..ff7405b 100644 --- a/ckanext/harvest/plugin.py +++ b/ckanext/harvest/plugin.py @@ -7,6 +7,12 @@ from ckan import logic from ckan import model import ckan.plugins as p from ckan.lib.plugins import DefaultDatasetForm +try: + from ckan.lib.plugins import DefaultTranslation +except ImportError: + class DefaultTranslation(): + pass + from ckan.lib.navl import dictization_functions from ckanext.harvest import logic as harvest_logic @@ -15,12 +21,14 @@ from ckanext.harvest.model import setup as model_setup from ckanext.harvest.model import HarvestSource, HarvestJob, HarvestObject + log = getLogger(__name__) assert not log.disabled DATASET_TYPE_NAME = 'harvest' -class Harvest(p.SingletonPlugin, DefaultDatasetForm): + +class Harvest(p.SingletonPlugin, DefaultDatasetForm, DefaultTranslation): p.implements(p.IConfigurable) p.implements(p.IRoutes, inherit=True) @@ -31,6 +39,9 @@ class Harvest(p.SingletonPlugin, DefaultDatasetForm): p.implements(p.IPackageController, inherit=True) p.implements(p.ITemplateHelpers) p.implements(p.IFacets, inherit=True) + if p.toolkit.check_ckan_version(min_version='2.5.0'): + p.implements(p.ITranslation, inherit=True) + startup = False diff --git a/ckanext/harvest/queue.py b/ckanext/harvest/queue.py index 2b57be4..e4c098b 100644 --- a/ckanext/harvest/queue.py +++ b/ckanext/harvest/queue.py @@ -395,7 +395,7 @@ def fetch_and_import_stages(harvester, obj): success_fetch = harvester.fetch_stage(obj) obj.fetch_finished = datetime.datetime.utcnow() obj.save() - if success_fetch: + if success_fetch is True: # If no errors where found, call the import method obj.import_started = datetime.datetime.utcnow() obj.state = "IMPORT" @@ -411,6 +411,11 @@ def fetch_and_import_stages(harvester, obj): else: obj.state = "ERROR" obj.save() + elif success_fetch == 'unchanged': + obj.state = 'COMPLETE' + obj.report_status = 'not modified' + obj.save() + return else: obj.state = "ERROR" obj.save() diff --git a/ckanext/harvest/tests/factories.py b/ckanext/harvest/tests/factories.py index 5e77283..fc70f09 100644 --- a/ckanext/harvest/tests/factories.py +++ b/ckanext/harvest/tests/factories.py @@ -82,6 +82,9 @@ class HarvestObject(factory.Factory): if 'job_id' not in kwargs: kwargs['job_id'] = kwargs['job'].id kwargs['source_id'] = kwargs['job'].source.id + # Remove 'job' to avoid it getting added as a HarvestObjectExtra + if 'job' in kwargs: + kwargs.pop('job') job_dict = toolkit.get_action('harvest_object_create')( context, kwargs) if cls._return_type == 'dict': diff --git a/ckanext/harvest/tests/harvesters/mock_ckan.py b/ckanext/harvest/tests/harvesters/mock_ckan.py index 64ae101..0df24bc 100644 --- a/ckanext/harvest/tests/harvesters/mock_ckan.py +++ b/ckanext/harvest/tests/harvesters/mock_ckan.py @@ -64,6 +64,7 @@ class MockCkanHandler(SimpleHTTPServer.SimpleHTTPRequestHandler): return self.respond_json(revision_ids) if self.path.startswith('/api/rest/revision/'): revision_ref = self.path.split('/')[-1] + assert api_version == 2 for rev in REVISIONS: if rev['id'] == revision_ref: return self.respond_json(rev) @@ -440,7 +441,7 @@ REVISIONS = [ "approved_timestamp": None, "packages": [ - DATASETS[1]['name'] + DATASETS[1]['id'] ], "groups": [ ] }, @@ -452,7 +453,7 @@ REVISIONS = [ "approved_timestamp": None, "packages": [ - DATASETS[1]['name'] + DATASETS[1]['id'] ], "groups": [ ] }] diff --git a/ckanext/harvest/tests/harvesters/test_base.py b/ckanext/harvest/tests/harvesters/test_base.py index efb1a84..37829a1 100644 --- a/ckanext/harvest/tests/harvesters/test_base.py +++ b/ckanext/harvest/tests/harvesters/test_base.py @@ -2,7 +2,7 @@ import re from nose.tools import assert_equal from ckanext.harvest import model as harvest_model -from ckanext.harvest.harvesters.base import HarvesterBase +from ckanext.harvest.harvesters.base import HarvesterBase, munge_tag try: from ckan.tests import helpers from ckan.tests import factories @@ -96,3 +96,30 @@ class TestEnsureNameIsUnique(object): name = _ensure_name_is_unique('trees', append_type='random-hex') # e.g. 'trees0b53f' assert re.match('trees[\da-f]{5}', name) + + +# taken from ckan/tests/lib/test_munge.py +class TestMungeTag: + + # (original, expected) + munge_list = [ + ('unchanged', 'unchanged'), + ('s', 's_'), # too short + ('some spaces here', 'some-spaces--here'), + ('random:other%characters&_.here', 'randomothercharactershere'), + ('river-water-dashes', 'river-water-dashes'), + ] + + def test_munge_tag(self): + '''Munge a list of tags gives expected results.''' + for org, exp in self.munge_list: + munge = munge_tag(org) + assert_equal(munge, exp) + + def test_munge_tag_multiple_pass(self): + '''Munge a list of tags muliple times gives expected results.''' + for org, exp in self.munge_list: + first_munge = munge_tag(org) + assert_equal(first_munge, exp) + second_munge = munge_tag(first_munge) + assert_equal(second_munge, exp) diff --git a/ckanext/harvest/tests/harvesters/test_ckanharvester.py b/ckanext/harvest/tests/harvesters/test_ckanharvester.py index 624380a..e03765f 100644 --- a/ckanext/harvest/tests/harvesters/test_ckanharvester.py +++ b/ckanext/harvest/tests/harvesters/test_ckanharvester.py @@ -106,7 +106,7 @@ class TestCkanHarvester(object): harvester=CKANHarvester()) # updated the dataset which has revisions - result = results_by_guid[mock_ckan.DATASETS[1]['name']] + result = results_by_guid[mock_ckan.DATASETS[1]['id']] assert_equal(result['state'], 'COMPLETE') assert_equal(result['report_status'], 'updated') assert_equal(result['dataset']['name'], mock_ckan.DATASETS[1]['name']) @@ -155,7 +155,7 @@ class TestCkanHarvester(object): # The metadata_modified was the same for this dataset so the import # would have returned 'unchanged' - result = results_by_guid[mock_ckan.DATASETS[1]['name']] + result = results_by_guid[mock_ckan.DATASETS[1]['id']] assert_equal(result['state'], 'COMPLETE') assert_equal(result['report_status'], 'not modified') assert 'dataset' not in result diff --git a/ckanext/harvest/tests/test_action.py b/ckanext/harvest/tests/test_action.py index 9cbfc36..1fc8c23 100644 --- a/ckanext/harvest/tests/test_action.py +++ b/ckanext/harvest/tests/test_action.py @@ -1,5 +1,4 @@ import json -import uuid import factories import unittest from nose.tools import assert_equal, assert_raises @@ -7,11 +6,12 @@ from nose.plugins.skip import SkipTest try: from ckan.tests import factories as ckan_factories - from ckan.tests.helpers import _get_test_app, reset_db, FunctionalTestBase + from ckan.tests.helpers import (_get_test_app, reset_db, + FunctionalTestBase, assert_in) except ImportError: from ckan.new_tests import factories as ckan_factories from ckan.new_tests.helpers import (_get_test_app, reset_db, - FunctionalTestBase) + FunctionalTestBase, assert_in) from ckan import plugins as p from ckan.plugins import toolkit from ckan import model @@ -403,6 +403,44 @@ class TestActions(ActionBase): toolkit.get_action('harvest_source_create')( {'user': site_user}, data_dict) + def test_harvest_job_create_as_sysadmin(self): + source = factories.HarvestSource(**SOURCE_DICT) + + site_user = toolkit.get_action('get_site_user')( + {'model': model, 'ignore_auth': True}, {})['name'] + data_dict = { + 'source_id': source['id'], + 'run': True + } + job = toolkit.get_action('harvest_job_create')( + {'user': site_user}, data_dict) + + assert_equal(job['source_id'], source['id']) + assert_equal(job['status'], 'Running') + assert_equal(job['gather_started'], None) + assert_in('stats', job.keys()) + + def test_harvest_job_create_as_admin(self): + # as if an admin user presses 'refresh' + user = ckan_factories.User() + user['capacity'] = 'admin' + org = ckan_factories.Organization(users=[user]) + source_dict = dict(SOURCE_DICT.items() + + [('publisher_id', org['id'])]) + source = factories.HarvestSource(**source_dict) + + data_dict = { + 'source_id': source['id'], + 'run': True + } + job = toolkit.get_action('harvest_job_create')( + {'user': user['name']}, data_dict) + + assert_equal(job['source_id'], source['id']) + assert_equal(job['status'], 'Running') + assert_equal(job['gather_started'], None) + assert_in('stats', job.keys()) + class TestHarvestObject(unittest.TestCase): @classmethod diff --git a/ckanext/harvest/tests/test_queue.py b/ckanext/harvest/tests/test_queue.py index 6a86a3b..bb381c7 100644 --- a/ckanext/harvest/tests/test_queue.py +++ b/ckanext/harvest/tests/test_queue.py @@ -254,7 +254,6 @@ class TestHarvestQueue(object): ) assert_equal(harvest_job['stats'], {'added': 0, 'updated': 2, 'not modified': 0, 'errored': 0, 'deleted': 1}) - context['detailed'] = True harvest_source_dict = logic.get_action('harvest_source_show')( context, {'id': harvest_source['id']} diff --git a/ckanext/harvest/tests/test_queue2.py b/ckanext/harvest/tests/test_queue2.py index 52251ef..c2df12c 100644 --- a/ckanext/harvest/tests/test_queue2.py +++ b/ckanext/harvest/tests/test_queue2.py @@ -35,6 +35,8 @@ class MockHarvester(p.SingletonPlugin): return [obj.id] def fetch_stage(self, harvest_object): + if self._test_params.get('fetch_object_unchanged'): + return 'unchanged' harvest_object.content = json.dumps({'name': harvest_object.guid}) harvest_object.save() return True @@ -81,14 +83,13 @@ class MockHarvester(p.SingletonPlugin): harvest_object.save() - if self._test_params.get('object_unchanged'): + if self._test_params.get('import_object_unchanged'): return 'unchanged' return True class TestEndStates(object): - @classmethod - def setup_class(cls): + def setup(self): reset_db() harvest_model.setup() @@ -155,9 +156,22 @@ class TestEndStates(object): assert_equal(result['report_status'], 'errored') assert_equal(result['errors'], []) - def test_unchanged(self): + def test_fetch_unchanged(self): guid = 'obj-error' - MockHarvester._set_test_params(guid=guid, object_unchanged=True) + MockHarvester._set_test_params(guid=guid, fetch_object_unchanged=True) + + results_by_guid = run_harvest( + url='http://some-url.com', + harvester=MockHarvester()) + + result = results_by_guid[guid] + assert_equal(result['state'], 'COMPLETE') + assert_equal(result['report_status'], 'not modified') + assert_equal(result['errors'], []) + + def test_import_unchanged(self): + guid = 'obj-error' + MockHarvester._set_test_params(guid=guid, import_object_unchanged=True) results_by_guid = run_harvest( url='http://some-url.com',