From 39ce744368dac3f14e0a7159fbe17ddc57cec2e2 Mon Sep 17 00:00:00 2001 From: Mark Winterbottom Date: Thu, 29 Oct 2015 17:18:51 +0000 Subject: [PATCH 01/19] Modified to make PEP-8 compliant. --- ckanext/harvest/logic/validators.py | 73 +++++++++++++++++++---------- 1 file changed, 47 insertions(+), 26 deletions(-) diff --git a/ckanext/harvest/logic/validators.py b/ckanext/harvest/logic/validators.py index 4dec758..369caa0 100644 --- a/ckanext/harvest/logic/validators.py +++ b/ckanext/harvest/logic/validators.py @@ -14,14 +14,16 @@ from ckan.lib.navl.validators import keep_extras log = logging.getLogger(__name__) + def harvest_source_id_exists(value, context): - result = HarvestSource.get(value,None) + result = HarvestSource.get(value, None) if not result: raise Invalid('Harvest Source with id %r does not exist.' % str(value)) return value + def harvest_job_exists(value, context): """Check if a harvest job exists and returns the model if it does""" result = HarvestJob.get(value, None) @@ -30,6 +32,7 @@ def harvest_job_exists(value, context): raise Invalid('Harvest Job with id %r does not exist.' % str(value)) return result + def _normalize_url(url): o = urlparse.urlparse(url) @@ -51,11 +54,12 @@ def _normalize_url(url): o.scheme, netloc, path, - None,None,None)) + None, None, None)) return check_url -def harvest_source_url_validator(key,data,errors,context): + +def harvest_source_url_validator(key, data, errors, context): package = context.get("package") if package: @@ -64,44 +68,54 @@ def harvest_source_url_validator(key,data,errors,context): package_id = data.get(key[:-1] + ("id",)) new_url = _normalize_url(data[key]) - #pkg_id = data.get(('id',),'') + # pkg_id = data.get(('id',),'') q = model.Session.query(model.Package.url, model.Package.state) \ - .filter(model.Package.type==DATASET_TYPE_NAME) + .filter(model.Package.type == DATASET_TYPE_NAME) if package_id: # When editing a source we need to avoid its own URL - q = q.filter(model.Package.id!=package_id) + q = q.filter(model.Package.id != package_id) existing_sources = q.all() for url, state in existing_sources: url = _normalize_url(url) if url == new_url: - raise Invalid('There already is a Harvest Source for this URL: %s' % data[key]) + raise Invalid( + 'There already is a Harvest Source for this URL: %s' + % data[key] + ) return data[key] -def harvest_source_type_exists(value,context): - #TODO: use new description interface + +def harvest_source_type_exists(value, context): + # TODO: use new description interface # Get all the registered harvester types available_types = [] for harvester in PluginImplementations(IHarvester): info = harvester.info() if not info or 'name' not in info: - log.error('Harvester %r does not provide the harvester name in the info response' % str(harvester)) + log.error( + 'Harvester %r does not provide the harvester name in the info ' + 'response' % str(harvester) + ) continue available_types.append(info['name']) - - if not value in available_types: - raise Invalid('Unknown harvester type: %s. Have you registered a harvester for this type?' % value) + if value not in available_types: + raise Invalid( + 'Unknown harvester type: %s. Have you registered a harvester for ' + 'this type?' % value + ) return value -def harvest_source_config_validator(key,data,errors,context): - harvester_type = data.get(('source_type',),'') + +def harvest_source_config_validator(key, data, errors, context): + harvester_type = data.get(('source_type',), '') for harvester in PluginImplementations(IHarvester): info = harvester.info() if info['name'] == harvester_type: @@ -109,21 +123,24 @@ def harvest_source_config_validator(key,data,errors,context): try: return harvester.validate_config(data[key]) except Exception, e: - raise Invalid('Error parsing the configuration options: %s' % str(e)) + raise Invalid( + 'Error parsing the configuration options: %s' % str(e)) else: return data[key] + def keep_not_empty_extras(key, data, errors, context): extras = data.pop(key, {}) for extras_key, value in extras.iteritems(): if value: data[key[:-1] + (extras_key,)] = value -def harvest_source_extra_validator(key,data,errors,context): - harvester_type = data.get(('source_type',),'') - #gather all extra fields to use as whitelist of what - #can be added to top level data_dict +def harvest_source_extra_validator(key, data, errors, context): + harvester_type = data.get(('source_type',), '') + + # gather all extra fields to use as whitelist of what + # can be added to top level data_dict all_extra_fields = set() for harvester in PluginImplementations(IHarvester): if not hasattr(harvester, 'extra_schema'): @@ -142,7 +159,7 @@ def harvest_source_extra_validator(key,data,errors,context): extra_data, extra_errors = validate(data.get(key, {}), extra_schema) for key in extra_data.keys(): - #only allow keys that appear in at least one harvester + # only allow keys that appear in at least one harvester if key not in all_extra_fields: extra_data.pop(key) @@ -152,8 +169,8 @@ def harvest_source_extra_validator(key,data,errors,context): for key, value in extra_errors.iteritems(): errors[(key,)] = value - ## need to get config out of extras as __extra runs - ## after rest of validation + # need to get config out of extras as __extra runs + # after rest of validation package_extras = data.get(('extras',), []) for num, extra in enumerate(list(package_extras)): @@ -177,21 +194,24 @@ def harvest_source_extra_validator(key,data,errors,context): if package_extras: data[('extras',)] = package_extras -def harvest_source_convert_from_config(key,data,errors,context): + +def harvest_source_convert_from_config(key, data, errors, context): config = data[key] if config: config_dict = json.loads(config) for key, value in config_dict.iteritems(): data[(key,)] = value -def harvest_source_active_validator(value,context): - if isinstance(value,basestring): + +def harvest_source_active_validator(value, context): + if isinstance(value, basestring): if value.lower() == 'true': return True else: return False return bool(value) + def harvest_source_frequency_exists(value): if value == '': value = 'MANUAL' @@ -205,6 +225,7 @@ def dataset_type_exists(value): value = DATASET_TYPE_NAME return value + def harvest_object_extras_validator(value, context): if not isinstance(value, dict): raise Invalid('extras must be a dict') From 2c41293c9cd5826b5a9a41abfde7a4c52b2c1b5a Mon Sep 17 00:00:00 2001 From: Mark Winterbottom Date: Thu, 29 Oct 2015 18:30:51 +0000 Subject: [PATCH 02/19] Updated the validator to check for unique sets as well as URL. --- ckanext/harvest/logic/validators.py | 32 +++++++++++++++++++++++------ 1 file changed, 26 insertions(+), 6 deletions(-) diff --git a/ckanext/harvest/logic/validators.py b/ckanext/harvest/logic/validators.py index 369caa0..65a11ce 100644 --- a/ckanext/harvest/logic/validators.py +++ b/ckanext/harvest/logic/validators.py @@ -60,6 +60,11 @@ def _normalize_url(url): def harvest_source_url_validator(key, data, errors, context): + """Validate the provided harvest source URL. + + Checks that the URL is not already existing with the same config. + """ + package = context.get("package") if package: @@ -67,21 +72,36 @@ def harvest_source_url_validator(key, data, errors, context): else: package_id = data.get(key[:-1] + ("id",)) - new_url = _normalize_url(data[key]) - # pkg_id = data.get(('id',),'') + try: + new_config = data.get(key[:-1] + ('config',)) + new_config_dict = json.loads(new_config) + new_config_set = new_config_dict.get('set', None) + except: + new_config_set = None - q = model.Session.query(model.Package.url, model.Package.state) \ + new_url = _normalize_url(data[key]) + + # q = model.Session.query(model.Package.url, model.Package.state) \ + q = model.Session.query(HarvestSource.url, HarvestSource.config) \ .filter(model.Package.type == DATASET_TYPE_NAME) if package_id: - # When editing a source we need to avoid its own URL + # When editing a source we need to avoid its own URL. q = q.filter(model.Package.id != package_id) existing_sources = q.all() - for url, state in existing_sources: + for url, conf in existing_sources: url = _normalize_url(url) - if url == new_url: + try: + config_dict = json.loads(conf) + config_set = config_dict.get('set', None) + except: + config_set = None + + if url == new_url and config_set == new_config_set: + # You can have a duplicate URL if it's pointing to a unique + # set as it will be harvesting unique datasets. raise Invalid( 'There already is a Harvest Source for this URL: %s' % data[key] From 55325f5940ee3643a97c0409ea85ccb77949c0ae Mon Sep 17 00:00:00 2001 From: Mark Winterbottom Date: Fri, 30 Oct 2015 11:59:24 +0000 Subject: [PATCH 03/19] Updated harvest source url validator to allow for duplicate URL's with unique configs. --- ckanext/harvest/logic/validators.py | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/ckanext/harvest/logic/validators.py b/ckanext/harvest/logic/validators.py index 65a11ce..a88d46b 100644 --- a/ckanext/harvest/logic/validators.py +++ b/ckanext/harvest/logic/validators.py @@ -74,10 +74,8 @@ def harvest_source_url_validator(key, data, errors, context): try: new_config = data.get(key[:-1] + ('config',)) - new_config_dict = json.loads(new_config) - new_config_set = new_config_dict.get('set', None) except: - new_config_set = None + new_config = None new_url = _normalize_url(data[key]) @@ -93,13 +91,8 @@ def harvest_source_url_validator(key, data, errors, context): for url, conf in existing_sources: url = _normalize_url(url) - try: - config_dict = json.loads(conf) - config_set = config_dict.get('set', None) - except: - config_set = None - if url == new_url and config_set == new_config_set: + if url == new_url and conf == new_config: # You can have a duplicate URL if it's pointing to a unique # set as it will be harvesting unique datasets. raise Invalid( From 02b81187dfcb91783dcbcb06c1a503791d7c3537 Mon Sep 17 00:00:00 2001 From: Mark Winterbottom Date: Fri, 30 Oct 2015 15:15:41 +0000 Subject: [PATCH 04/19] Fixed bug with deleting harvest source's which have a custom configuration. Added PEP-8 compliance. --- ckanext/harvest/logic/action/delete.py | 15 ++- ckanext/harvest/logic/action/update.py | 157 ++++++++++++++----------- 2 files changed, 98 insertions(+), 74 deletions(-) diff --git a/ckanext/harvest/logic/action/delete.py b/ckanext/harvest/logic/action/delete.py index 405aa3a..3e34aff 100644 --- a/ckanext/harvest/logic/action/delete.py +++ b/ckanext/harvest/logic/action/delete.py @@ -5,8 +5,9 @@ from ckan import plugins as p log = logging.getLogger(__name__) + def harvest_source_delete(context, data_dict): - ''' + """ Deletes an existing harvest source This method just proxies the request to package_delete, @@ -19,16 +20,14 @@ def harvest_source_delete(context, data_dict): :returns: the newly created harvest source :rtype: dictionary - ''' + """ + log.info('Deleting harvest source: %r', data_dict) p.toolkit.check_access('harvest_source_delete', context, data_dict) - p.toolkit.get_action('package_delete')(context, data_dict) - if context.get('clear_source', False): - - # We need the id, the name won't work + # We need the id, the name won't work. package_dict = p.toolkit.get_action('package_show')(context, data_dict) - - p.toolkit.get_action('harvest_source_clear')(context, {'id': package_dict['id']}) + p.toolkit.get_action('harvest_source_clear')( + context, {'id': package_dict['id']}) diff --git a/ckanext/harvest/logic/action/update.py b/ckanext/harvest/logic/action/update.py index 9e871d9..75d99d5 100644 --- a/ckanext/harvest/logic/action/update.py +++ b/ckanext/harvest/logic/action/update.py @@ -29,12 +29,14 @@ from ckanext.harvest.model import HarvestSource, HarvestJob, HarvestObject from ckanext.harvest.logic import HarvestJobExists, NoNewHarvestJobError from ckanext.harvest.logic.schema import harvest_source_show_package_schema -from ckanext.harvest.logic.action.get import harvest_source_show, harvest_job_list, _get_sources_for_user +from ckanext.harvest.logic.action.get import harvest_source_show, \ + harvest_job_list, _get_sources_for_user log = logging.getLogger(__name__) -def harvest_source_update(context,data_dict): + +def harvest_source_update(context, data_dict): ''' Updates an existing harvest source @@ -83,18 +85,22 @@ def harvest_source_update(context,data_dict): return source -def harvest_source_clear(context,data_dict): + +def harvest_source_clear(context, data_dict): ''' - Clears all datasets, jobs and objects related to a harvest source, but keeps the source itself. - This is useful to clean history of long running harvest sources to start again fresh. + Clears all datasets, jobs and objects related to a harvest source, but + keeps the source itself. + This is useful to clean history of long running harvest sources to start + again fresh. :param id: the id of the harvest source to clear :type id: string ''' - check_access('harvest_source_clear',context,data_dict) - harvest_source_id = data_dict.get('id',None) + check_access('harvest_source_clear', context, data_dict) + + harvest_source_id = data_dict.get('id', None) source = HarvestSource.get(harvest_source_id) if not source: @@ -108,14 +114,18 @@ def harvest_source_clear(context,data_dict): model = context['model'] - sql = "select id from related where id in (select related_id from related_dataset where dataset_id in (select package_id from harvest_object where harvest_source_id = '{harvest_source_id}'));".format(harvest_source_id=harvest_source_id) + sql = "select id from related where id in (select related_id from " \ + "related_dataset where dataset_id in (select package_id from " \ + "harvest_object where harvest_source_id = " \ + "'{harvest_source_id}'));".format( + harvest_source_id=harvest_source_id) result = model.Session.execute(sql) ids = [] for row in result: ids.append(row[0]) related_ids = "('" + "','".join(ids) + "')" - sql = '''begin; + sql = '''begin; update package set state = 'to_delete' where id in (select package_id from harvest_object where harvest_source_id = '{harvest_source_id}');'''.format( harvest_source_id=harvest_source_id) @@ -129,15 +139,15 @@ def harvest_source_clear(context,data_dict): # Backwards-compatibility: support ResourceGroup (pre-CKAN-2.3) else: sql += ''' - delete from resource_revision where resource_group_id in - (select id from resource_group where package_id in + delete from resource_revision where resource_group_id in + (select id from resource_group where package_id in (select id from package where state = 'to_delete')); - delete from resource where resource_group_id in - (select id from resource_group where package_id in + delete from resource where resource_group_id in + (select id from resource_group where package_id in (select id from package where state = 'to_delete')); - delete from resource_group_revision where package_id in + delete from resource_group_revision where package_id in (select id from package where state = 'to_delete'); - delete from resource_group where package_id in + delete from resource_group where package_id in (select id from package where state = 'to_delete'); ''' # CKAN pre-2.5: authz models were removed in migration 078 @@ -181,10 +191,11 @@ def harvest_source_clear(context,data_dict): return {'id': harvest_source_id} -def harvest_source_index_clear(context,data_dict): - check_access('harvest_source_clear',context,data_dict) - harvest_source_id = data_dict.get('id',None) +def harvest_source_index_clear(context, data_dict): + + check_access('harvest_source_clear', context, data_dict) + harvest_source_id = data_dict.get('id', None) source = HarvestSource.get(harvest_source_id) if not source: @@ -194,8 +205,8 @@ def harvest_source_index_clear(context,data_dict): harvest_source_id = source.id conn = make_connection() - query = ''' +%s:"%s" +site_id:"%s" ''' % ('harvest_source_id', harvest_source_id, - config.get('ckan.site_id')) + query = ''' +%s:"%s" +site_id:"%s" ''' % ( + 'harvest_source_id', harvest_source_id, config.get('ckan.site_id')) try: conn.delete_query(query) if asbool(config.get('ckan.search.solr_commit', 'true')): @@ -208,7 +219,8 @@ def harvest_source_index_clear(context,data_dict): return {'id': harvest_source_id} -def harvest_objects_import(context,data_dict): + +def harvest_objects_import(context, data_dict): ''' Reimports the current harvest objects It performs the import stage with the last fetched objects, optionally @@ -217,18 +229,19 @@ def harvest_objects_import(context,data_dict): It will only affect the last fetched objects already present in the database. ''' + log.info('Harvest objects import: %r', data_dict) - check_access('harvest_objects_import',context,data_dict) + check_access('harvest_objects_import', context, data_dict) model = context['model'] session = context['session'] - source_id = data_dict.get('source_id',None) - harvest_object_id = data_dict.get('harvest_object_id',None) - package_id_or_name = data_dict.get('package_id',None) + source_id = data_dict.get('source_id', None) + harvest_object_id = data_dict.get('harvest_object_id', None) + package_id_or_name = data_dict.get('package_id', None) - segments = context.get('segments',None) + segments = context.get('segments', None) - join_datasets = context.get('join_datasets',True) + join_datasets = context.get('join_datasets', rue) if source_id: source = HarvestSource.get(source_id) @@ -241,42 +254,43 @@ def harvest_objects_import(context,data_dict): raise Exception('This harvest source is not active') last_objects_ids = session.query(HarvestObject.id) \ - .join(HarvestSource) \ - .filter(HarvestObject.source==source) \ - .filter(HarvestObject.current==True) + .join(HarvestSource) \ + .filter(HarvestObject.source == source) \ + .filter(HarvestObject.current == True) elif harvest_object_id: last_objects_ids = session.query(HarvestObject.id) \ - .filter(HarvestObject.id==harvest_object_id) + .filter(HarvestObject.id == harvest_object_id) elif package_id_or_name: last_objects_ids = session.query(HarvestObject.id) \ .join(Package) \ - .filter(HarvestObject.current==True) \ - .filter(Package.state==u'active') \ - .filter(or_(Package.id==package_id_or_name, - Package.name==package_id_or_name)) + .filter(HarvestObject.current == True) \ + .filter(Package.state == u'active') \ + .filter(or_(Package.id == package_id_or_name, + Package.name == package_id_or_name)) join_datasets = False else: last_objects_ids = session.query(HarvestObject.id) \ - .filter(HarvestObject.current==True) + .filter(HarvestObject.current == True) if join_datasets: last_objects_ids = last_objects_ids.join(Package) \ - .filter(Package.state==u'active') + .filter(Package.state == u'active') last_objects_ids = last_objects_ids.all() last_objects_count = 0 for obj_id in last_objects_ids: - if segments and str(hashlib.md5(obj_id[0]).hexdigest())[0] not in segments: + if segments and \ + str(hashlib.md5(obj_id[0]).hexdigest())[0] not in segments: continue obj = session.query(HarvestObject).get(obj_id) for harvester in PluginImplementations(IHarvester): if harvester.info()['name'] == obj.source.type: - if hasattr(harvester,'force_import'): + if hasattr(harvester, 'force_import'): harvester.force_import = True harvester.import_stage(obj) break @@ -284,6 +298,7 @@ def harvest_objects_import(context,data_dict): log.info('Harvest objects imported: %s', last_objects_count) return last_objects_count + def _caluclate_next_run(frequency): now = datetime.datetime.utcnow() @@ -296,7 +311,7 @@ def _caluclate_next_run(frequency): if frequency == 'DAILY': return now + datetime.timedelta(days=1) if frequency == 'MONTHLY': - if now.month in (4,6,9,11): + if now.month in (4, 6, 9, 11): days = 30 elif now.month == 2: if now.year % 4 == 0: @@ -325,13 +340,14 @@ def _make_scheduled_jobs(context, data_dict): source.next_run = _caluclate_next_run(source.frequency) source.save() -def harvest_jobs_run(context,data_dict): + +def harvest_jobs_run(context, data_dict): log.info('Harvest job run: %r', data_dict) - check_access('harvest_jobs_run',context,data_dict) + check_access('harvest_jobs_run', context, data_dict) session = context['session'] - source_id = data_dict.get('source_id',None) + source_id = data_dict.get('source_id', None) if not source_id: _make_scheduled_jobs(context, data_dict) @@ -339,14 +355,15 @@ def harvest_jobs_run(context,data_dict): context['return_objects'] = False # Flag finished jobs as such - jobs = harvest_job_list(context,{'source_id':source_id,'status':u'Running'}) + jobs = harvest_job_list( + context, {'source_id': source_id, 'status': u'Running'}) if len(jobs): for job in jobs: if job['gather_finished']: objects = session.query(HarvestObject.id) \ - .filter(HarvestObject.harvest_job_id==job['id']) \ - .filter(and_((HarvestObject.state!=u'COMPLETE'), - (HarvestObject.state!=u'ERROR'))) \ + .filter(HarvestObject.harvest_job_id == job['id']) \ + .filter(and_((HarvestObject.state != u'COMPLETE'), + (HarvestObject.state != u'ERROR'))) \ .order_by(HarvestObject.import_finished.desc()) if objects.count() == 0: @@ -354,23 +371,24 @@ def harvest_jobs_run(context,data_dict): job_obj.status = u'Finished' last_object = session.query(HarvestObject) \ - .filter(HarvestObject.harvest_job_id==job['id']) \ - .filter(HarvestObject.import_finished!=None) \ - .order_by(HarvestObject.import_finished.desc()) \ - .first() + .filter(HarvestObject.harvest_job_id == job['id']) \ + .filter(HarvestObject.import_finished != None) \ + .order_by(HarvestObject.import_finished.desc()) \ + .first() if last_object: job_obj.finished = last_object.import_finished job_obj.save() # Reindex the harvest source dataset so it has the latest # status - get_action('harvest_source_reindex')(context, - {'id': job_obj.source.id}) + get_action('harvest_source_reindex')( + context, {'id': job_obj.source.id}) # resubmit old redis tasks resubmit_jobs() # Check if there are pending harvest jobs - jobs = harvest_job_list(context,{'source_id':source_id,'status':u'New'}) + jobs = harvest_job_list( + context, {'source_id': source_id, 'status': u'New'}) if len(jobs) == 0: log.info('No new harvest jobs.') raise NoNewHarvestJobError('There are no new harvesting jobs') @@ -380,7 +398,7 @@ def harvest_jobs_run(context,data_dict): sent_jobs = [] for job in jobs: context['detailed'] = False - source = harvest_source_show(context,{'id':job['source_id']}) + source = harvest_source_show(context, {'id': job['source_id']}) if source['active']: job_obj = HarvestJob.get(job['id']) job_obj.status = job['status'] = u'Running' @@ -404,23 +422,25 @@ def harvest_sources_reindex(context, data_dict): model = context['model'] packages = model.Session.query(model.Package) \ - .filter(model.Package.type==DATASET_TYPE_NAME) \ - .filter(model.Package.state==u'active') \ + .filter(model.Package.type == DATASET_TYPE_NAME) \ + .filter(model.Package.state == u'active') \ .all() package_index = PackageSearchIndex() reindex_context = {'defer_commit': True} for package in packages: - get_action('harvest_source_reindex')(reindex_context, {'id': package.id}) + get_action('harvest_source_reindex')( + reindex_context, {'id': package.id}) package_index.commit() return True + @logic.side_effect_free def harvest_source_reindex(context, data_dict): - '''Reindex a single harvest source''' + """Reindex a single harvest source.""" harvest_source_id = logic.get_or_bust(data_dict, 'id') defer_commit = context.get('defer_commit', False) @@ -428,18 +448,23 @@ def harvest_source_reindex(context, data_dict): if 'extras_as_string'in context: del context['extras_as_string'] context.update({'ignore_auth': True}) - package_dict = logic.get_action('harvest_source_show')(context, - {'id': harvest_source_id}) - log.debug('Updating search index for harvest source {0}'.format(harvest_source_id)) + package_dict = logic.get_action('harvest_source_show')( + context, {'id': harvest_source_id}) + log.debug('Updating search index for harvest source {0}'.format( + harvest_source_id)) # Remove configuration values new_dict = {} - if package_dict.get('config'): + if package_dict.get('config', None): config = json.loads(package_dict['config']) for key, value in package_dict.iteritems(): - if key not in config: - new_dict[key] = value + if value: + if value and key not in config: + new_dict[key] = value + package_index = PackageSearchIndex() - package_index.index_package(new_dict, defer_commit=defer_commit) + package_index.index_package( + new_dict, + defer_commit=defer_commit) return True From 3f37ae5f45c8f4ded7e587f746273bf99cd691cd Mon Sep 17 00:00:00 2001 From: Mark Winterbottom Date: Fri, 30 Oct 2015 16:11:25 +0000 Subject: [PATCH 05/19] Corrected docstring. --- ckanext/harvest/logic/action/delete.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/ckanext/harvest/logic/action/delete.py b/ckanext/harvest/logic/action/delete.py index 3e34aff..cf22363 100644 --- a/ckanext/harvest/logic/action/delete.py +++ b/ckanext/harvest/logic/action/delete.py @@ -14,12 +14,6 @@ def harvest_source_delete(context, data_dict): which will delete the actual harvest type dataset and the HarvestSource object (via the after_delete extension point). - :param id: the name or id of the harvest source to delete - :type id: string - - :returns: the newly created harvest source - :rtype: dictionary - """ log.info('Deleting harvest source: %r', data_dict) From a6069d93db9a6ff4b452ea41a93af3c26309cabf Mon Sep 17 00:00:00 2001 From: Mark Winterbottom Date: Fri, 30 Oct 2015 16:59:04 +0000 Subject: [PATCH 06/19] Fixed bug where the harvest source url validator would validate against all harvest sources that were ever created instead of just sources that were currently enabled. --- ckanext/harvest/logic/validators.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/ckanext/harvest/logic/validators.py b/ckanext/harvest/logic/validators.py index a88d46b..aa5ca15 100644 --- a/ckanext/harvest/logic/validators.py +++ b/ckanext/harvest/logic/validators.py @@ -79,8 +79,8 @@ def harvest_source_url_validator(key, data, errors, context): new_url = _normalize_url(data[key]) - # q = model.Session.query(model.Package.url, model.Package.state) \ - q = model.Session.query(HarvestSource.url, HarvestSource.config) \ + q = model.Session.query( + model.Package.id, model.Package.url) \ .filter(model.Package.type == DATASET_TYPE_NAME) if package_id: @@ -89,8 +89,14 @@ def harvest_source_url_validator(key, data, errors, context): existing_sources = q.all() - for url, conf in existing_sources: + for uid, url in existing_sources: url = _normalize_url(url) + conf = model.Session.query(HarvestSource.config).filter( + HarvestSource.id == uid).first() + if conf: + conf = conf[0] + else: + conf = None if url == new_url and conf == new_config: # You can have a duplicate URL if it's pointing to a unique From 0c19acba789a085621f2ec854be5ad1e51cb0725 Mon Sep 17 00:00:00 2001 From: Mark Winterbottom Date: Mon, 2 Nov 2015 15:50:04 +0000 Subject: [PATCH 07/19] Changed double quotes to single quotes in docstrings. --- ckanext/harvest/logic/action/delete.py | 6 ++---- ckanext/harvest/logic/action/update.py | 2 +- ckanext/harvest/logic/validators.py | 7 ++++--- 3 files changed, 7 insertions(+), 8 deletions(-) diff --git a/ckanext/harvest/logic/action/delete.py b/ckanext/harvest/logic/action/delete.py index cf22363..5b1842a 100644 --- a/ckanext/harvest/logic/action/delete.py +++ b/ckanext/harvest/logic/action/delete.py @@ -7,14 +7,12 @@ log = logging.getLogger(__name__) def harvest_source_delete(context, data_dict): - """ - Deletes an existing harvest source + '''Deletes an existing harvest source. This method just proxies the request to package_delete, which will delete the actual harvest type dataset and the HarvestSource object (via the after_delete extension point). - - """ + ''' log.info('Deleting harvest source: %r', data_dict) diff --git a/ckanext/harvest/logic/action/update.py b/ckanext/harvest/logic/action/update.py index 75d99d5..c1f260e 100644 --- a/ckanext/harvest/logic/action/update.py +++ b/ckanext/harvest/logic/action/update.py @@ -440,7 +440,7 @@ def harvest_sources_reindex(context, data_dict): @logic.side_effect_free def harvest_source_reindex(context, data_dict): - """Reindex a single harvest source.""" + '''Reindex a single harvest source.''' harvest_source_id = logic.get_or_bust(data_dict, 'id') defer_commit = context.get('defer_commit', False) diff --git a/ckanext/harvest/logic/validators.py b/ckanext/harvest/logic/validators.py index aa5ca15..01e73ef 100644 --- a/ckanext/harvest/logic/validators.py +++ b/ckanext/harvest/logic/validators.py @@ -25,7 +25,8 @@ def harvest_source_id_exists(value, context): def harvest_job_exists(value, context): - """Check if a harvest job exists and returns the model if it does""" + '''Check if a harvest job exists and returns the model if it does''' + result = HarvestJob.get(value, None) if not result: @@ -60,10 +61,10 @@ def _normalize_url(url): def harvest_source_url_validator(key, data, errors, context): - """Validate the provided harvest source URL. + '''Validate the provided harvest source URL. Checks that the URL is not already existing with the same config. - """ + ''' package = context.get("package") From 1702cf2f09d3f2b8f27f094ba5b81abe03f8a8d0 Mon Sep 17 00:00:00 2001 From: Mark Winterbottom Date: Mon, 2 Nov 2015 15:51:25 +0000 Subject: [PATCH 08/19] Remove ', None' on .get() calls because it's the default value. --- ckanext/harvest/logic/action/update.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/ckanext/harvest/logic/action/update.py b/ckanext/harvest/logic/action/update.py index c1f260e..26f35d8 100644 --- a/ckanext/harvest/logic/action/update.py +++ b/ckanext/harvest/logic/action/update.py @@ -100,7 +100,7 @@ def harvest_source_clear(context, data_dict): check_access('harvest_source_clear', context, data_dict) - harvest_source_id = data_dict.get('id', None) + harvest_source_id = data_dict.get('id') source = HarvestSource.get(harvest_source_id) if not source: @@ -195,7 +195,7 @@ def harvest_source_clear(context, data_dict): def harvest_source_index_clear(context, data_dict): check_access('harvest_source_clear', context, data_dict) - harvest_source_id = data_dict.get('id', None) + harvest_source_id = data_dict.get('id') source = HarvestSource.get(harvest_source_id) if not source: @@ -235,11 +235,11 @@ def harvest_objects_import(context, data_dict): model = context['model'] session = context['session'] - source_id = data_dict.get('source_id', None) - harvest_object_id = data_dict.get('harvest_object_id', None) - package_id_or_name = data_dict.get('package_id', None) + source_id = data_dict.get('source_id',) + harvest_object_id = data_dict.get('harvest_object_id') + package_id_or_name = data_dict.get('package_id') - segments = context.get('segments', None) + segments = context.get('segments') join_datasets = context.get('join_datasets', rue) @@ -347,7 +347,7 @@ def harvest_jobs_run(context, data_dict): session = context['session'] - source_id = data_dict.get('source_id', None) + source_id = data_dict.get('source_id') if not source_id: _make_scheduled_jobs(context, data_dict) @@ -455,7 +455,7 @@ def harvest_source_reindex(context, data_dict): # Remove configuration values new_dict = {} - if package_dict.get('config', None): + if package_dict.get('config'): config = json.loads(package_dict['config']) for key, value in package_dict.iteritems(): if value: From 443d690ac80804bdd55a8cd99781f69502d561fa Mon Sep 17 00:00:00 2001 From: Mark Winterbottom Date: Mon, 2 Nov 2015 16:45:16 +0000 Subject: [PATCH 09/19] Fixed big typo error. --- ckanext/harvest/logic/action/update.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ckanext/harvest/logic/action/update.py b/ckanext/harvest/logic/action/update.py index d734cf7..284433e 100644 --- a/ckanext/harvest/logic/action/update.py +++ b/ckanext/harvest/logic/action/update.py @@ -285,7 +285,7 @@ def harvest_objects_import(context, data_dict): segments = context.get('segments') - join_datasets = context.get('join_datasets', rue) + join_datasets = context.get('join_datasets') if source_id: source = HarvestSource.get(source_id) From 7ffd6748f36a2ab3341e66835d3e896b67f2c905 Mon Sep 17 00:00:00 2001 From: Mark Winterbottom Date: Mon, 2 Nov 2015 16:59:43 +0000 Subject: [PATCH 10/19] Corrected docstring params field, duplicate if statement and deleting keys for blank values. --- ckanext/harvest/logic/action/delete.py | 2 ++ ckanext/harvest/logic/action/update.py | 7 +++---- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/ckanext/harvest/logic/action/delete.py b/ckanext/harvest/logic/action/delete.py index 5b1842a..88ae8b9 100644 --- a/ckanext/harvest/logic/action/delete.py +++ b/ckanext/harvest/logic/action/delete.py @@ -12,6 +12,8 @@ def harvest_source_delete(context, data_dict): This method just proxies the request to package_delete, which will delete the actual harvest type dataset and the HarvestSource object (via the after_delete extension point). + + :param id: the name or id of the harvest source to delete ''' log.info('Deleting harvest source: %r', data_dict) diff --git a/ckanext/harvest/logic/action/update.py b/ckanext/harvest/logic/action/update.py index 284433e..7c21905 100644 --- a/ckanext/harvest/logic/action/update.py +++ b/ckanext/harvest/logic/action/update.py @@ -285,7 +285,7 @@ def harvest_objects_import(context, data_dict): segments = context.get('segments') - join_datasets = context.get('join_datasets') + join_datasets = context.get('join_datasets', True) if source_id: source = HarvestSource.get(source_id) @@ -560,9 +560,8 @@ def harvest_source_reindex(context, data_dict): if package_dict.get('config'): config = json.loads(package_dict['config']) for key, value in package_dict.iteritems(): - if value: - if value and key not in config: - new_dict[key] = value + if key not in config: + new_dict[key] = value package_index = PackageSearchIndex() package_index.index_package( From 208d1c41852478e6ba4708249870edbf40fa868d Mon Sep 17 00:00:00 2001 From: Mark Winterbottom Date: Tue, 3 Nov 2015 17:31:00 +0000 Subject: [PATCH 11/19] Setting back to master. --- ckanext/harvest/logic/action/update.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/ckanext/harvest/logic/action/update.py b/ckanext/harvest/logic/action/update.py index 7c21905..8280419 100644 --- a/ckanext/harvest/logic/action/update.py +++ b/ckanext/harvest/logic/action/update.py @@ -550,8 +550,8 @@ def harvest_source_reindex(context, data_dict): if 'extras_as_string'in context: del context['extras_as_string'] context.update({'ignore_auth': True}) - package_dict = logic.get_action('harvest_source_show')( - context, {'id': harvest_source_id}) + package_dict = logic.get_action('harvest_source_show')(context, + {'id': harvest_source_id}) log.debug('Updating search index for harvest source: {0}'.format( package_dict.get('name') or harvest_source_id)) @@ -562,10 +562,7 @@ def harvest_source_reindex(context, data_dict): for key, value in package_dict.iteritems(): if key not in config: new_dict[key] = value - package_index = PackageSearchIndex() - package_index.index_package( - new_dict, - defer_commit=defer_commit) + package_index.index_package(new_dict, defer_commit=defer_commit) return True From 4f71612002e027e2f771e45051dce9b8dd374630 Mon Sep 17 00:00:00 2001 From: David Read Date: Tue, 3 Nov 2015 20:30:11 +0000 Subject: [PATCH 12/19] PEP8 based on #174 --- ckanext/harvest/logic/action/create.py | 6 +- ckanext/harvest/logic/action/delete.py | 5 +- ckanext/harvest/logic/action/update.py | 239 +++++++++++++++---------- ckanext/harvest/logic/validators.py | 69 ++++--- 4 files changed, 188 insertions(+), 131 deletions(-) diff --git a/ckanext/harvest/logic/action/create.py b/ckanext/harvest/logic/action/create.py index 203b9eb..d904c5c 100644 --- a/ckanext/harvest/logic/action/create.py +++ b/ckanext/harvest/logic/action/create.py @@ -148,15 +148,15 @@ def _check_for_existing_jobs(context, source_id): return exist def harvest_object_create(context, data_dict): - """ Create a new harvest object + ''' Create a new harvest object :type guid: string (optional) :type content: string (optional) - :type job_id: string + :type job_id: string :type source_id: string (optional) :type package_id: string (optional) :type extras: dict (optional) - """ + ''' check_access('harvest_object_create', context, data_dict) data, errors = _validate(data_dict, harvest_object_create_schema(), context) diff --git a/ckanext/harvest/logic/action/delete.py b/ckanext/harvest/logic/action/delete.py index 405aa3a..36eff6b 100644 --- a/ckanext/harvest/logic/action/delete.py +++ b/ckanext/harvest/logic/action/delete.py @@ -2,9 +2,9 @@ import logging from ckan import plugins as p - log = logging.getLogger(__name__) + def harvest_source_delete(context, data_dict): ''' Deletes an existing harvest source @@ -31,4 +31,5 @@ def harvest_source_delete(context, data_dict): # We need the id, the name won't work package_dict = p.toolkit.get_action('package_show')(context, data_dict) - p.toolkit.get_action('harvest_source_clear')(context, {'id': package_dict['id']}) + p.toolkit.get_action('harvest_source_clear')( + context, {'id': package_dict['id']}) diff --git a/ckanext/harvest/logic/action/update.py b/ckanext/harvest/logic/action/update.py index bca776c..b51f37f 100644 --- a/ckanext/harvest/logic/action/update.py +++ b/ckanext/harvest/logic/action/update.py @@ -29,12 +29,13 @@ from ckanext.harvest.model import HarvestSource, HarvestJob, HarvestObject from ckanext.harvest.logic import HarvestJobExists, NoNewHarvestJobError from ckanext.harvest.logic.dictization import harvest_job_dictize -from ckanext.harvest.logic.action.get import harvest_source_show, harvest_job_list, _get_sources_for_user - +from ckanext.harvest.logic.action.get import ( + harvest_source_show, harvest_job_list, _get_sources_for_user) log = logging.getLogger(__name__) -def harvest_source_update(context,data_dict): + +def harvest_source_update(context, data_dict): ''' Updates an existing harvest source @@ -69,10 +70,8 @@ def harvest_source_update(context,data_dict): type. Should be a serialized as JSON. (optional) :type config: string - :returns: the newly created harvest source :rtype: dictionary - ''' log.info('Updating harvest source: %r', data_dict) @@ -94,9 +93,10 @@ def harvest_source_clear(context, data_dict): :type id: string ''' - check_access('harvest_source_clear',context,data_dict) - harvest_source_id = data_dict.get('id', None) + check_access('harvest_source_clear', context, data_dict) + + harvest_source_id = data_dict.get('id') source = HarvestSource.get(harvest_source_id) if not source: @@ -110,68 +110,96 @@ def harvest_source_clear(context, data_dict): model = context['model'] - sql = "select id from related where id in (select related_id from related_dataset where dataset_id in (select package_id from harvest_object where harvest_source_id = '{harvest_source_id}'));".format(harvest_source_id=harvest_source_id) + sql = '''select id from related where id in ( + select related_id from related_dataset where dataset_id in ( + select package_id from harvest_object + where harvest_source_id = '{harvest_source_id}'));'''.format( + harvest_source_id=harvest_source_id) result = model.Session.execute(sql) ids = [] for row in result: ids.append(row[0]) related_ids = "('" + "','".join(ids) + "')" - sql = '''begin; - update package set state = 'to_delete' where id in (select package_id from harvest_object where harvest_source_id = '{harvest_source_id}');'''.format( + sql = '''begin; + update package set state = 'to_delete' where id in ( + select package_id from harvest_object + where harvest_source_id = '{harvest_source_id}');'''.format( harvest_source_id=harvest_source_id) # CKAN-2.3 or above: delete resource views, resource revisions & resources if toolkit.check_ckan_version(min_version='2.3'): sql += ''' - delete from resource_view where resource_id in (select id from resource where package_id in (select id from package where state = 'to_delete' )); - delete from resource_revision where package_id in (select id from package where state = 'to_delete' ); - delete from resource where package_id in (select id from package where state = 'to_delete' ); + delete from resource_view where resource_id in ( + select id from resource where package_id in ( + select id from package where state = 'to_delete')); + delete from resource_revision where package_id in ( + select id from package where state = 'to_delete'); + delete from resource where package_id in ( + select id from package where state = 'to_delete'); ''' # Backwards-compatibility: support ResourceGroup (pre-CKAN-2.3) else: sql += ''' - delete from resource_revision where resource_group_id in - (select id from resource_group where package_id in - (select id from package where state = 'to_delete')); - delete from resource where resource_group_id in - (select id from resource_group where package_id in - (select id from package where state = 'to_delete')); - delete from resource_group_revision where package_id in - (select id from package where state = 'to_delete'); - delete from resource_group where package_id in - (select id from package where state = 'to_delete'); + delete from resource_revision where resource_group_id in ( + select id from resource_group where package_id in ( + select id from package where state = 'to_delete')); + delete from resource where resource_group_id in ( + select id from resource_group where package_id in ( + select id from package where state = 'to_delete')); + delete from resource_group_revision where package_id in ( + select id from package where state = 'to_delete'); + delete from resource_group where package_id in ( + select id from package where state = 'to_delete'); ''' # CKAN pre-2.5: authz models were removed in migration 078 if toolkit.check_ckan_version(max_version='2.4.99'): sql += ''' - delete from package_role where package_id in - (select id from package where state = 'to_delete'); - delete from user_object_role where id not in - (select user_object_role_id from package_role) and context = 'Package'; + delete from package_role where package_id in ( + select id from package where state = 'to_delete'); + delete from user_object_role where id not in ( + select user_object_role_id from package_role) + and context = 'Package'; ''' - sql += ''' - delete from harvest_object_error where harvest_object_id in (select id from harvest_object where harvest_source_id = '{harvest_source_id}'); - delete from harvest_object_extra where harvest_object_id in (select id from harvest_object where harvest_source_id = '{harvest_source_id}'); + delete from harvest_object_error where harvest_object_id in ( + select id from harvest_object + where harvest_source_id = '{harvest_source_id}'); + delete from harvest_object_extra where harvest_object_id in ( + select id from harvest_object + where harvest_source_id = '{harvest_source_id}'); delete from harvest_object where harvest_source_id = '{harvest_source_id}'; - delete from harvest_gather_error where harvest_job_id in (select id from harvest_job where source_id = '{harvest_source_id}'); + delete from harvest_gather_error where harvest_job_id in ( + select id from harvest_job where source_id = '{harvest_source_id}'); delete from harvest_job where source_id = '{harvest_source_id}'; - delete from package_tag_revision where package_id in (select id from package where state = 'to_delete'); - delete from member_revision where table_id in (select id from package where state = 'to_delete'); - delete from package_extra_revision where package_id in (select id from package where state = 'to_delete'); - delete from package_revision where id in (select id from package where state = 'to_delete'); - delete from package_tag where package_id in (select id from package where state = 'to_delete'); - delete from package_extra where package_id in (select id from package where state = 'to_delete'); - delete from package_relationship_revision where subject_package_id in (select id from package where state = 'to_delete'); - delete from package_relationship_revision where object_package_id in (select id from package where state = 'to_delete'); - delete from package_relationship where subject_package_id in (select id from package where state = 'to_delete'); - delete from package_relationship where object_package_id in (select id from package where state = 'to_delete'); - delete from member where table_id in (select id from package where state = 'to_delete'); - delete from related_dataset where dataset_id in (select id from package where state = 'to_delete'); + delete from package_tag_revision where package_id in ( + select id from package where state = 'to_delete'); + delete from member_revision where table_id in ( + select id from package where state = 'to_delete'); + delete from package_extra_revision where package_id in ( + select id from package where state = 'to_delete'); + delete from package_revision where id in ( + select id from package where state = 'to_delete'); + delete from package_tag where package_id in ( + select id from package where state = 'to_delete'); + delete from package_extra where package_id in ( + select id from package where state = 'to_delete'); + delete from package_relationship_revision where subject_package_id in ( + select id from package where state = 'to_delete'); + delete from package_relationship_revision where object_package_id in ( + select id from package where state = 'to_delete'); + delete from package_relationship where subject_package_id in ( + select id from package where state = 'to_delete'); + delete from package_relationship where object_package_id in ( + select id from package where state = 'to_delete'); + delete from member where table_id in ( + select id from package where state = 'to_delete'); + delete from related_dataset where dataset_id in ( + select id from package where state = 'to_delete'); delete from related where id in {related_ids}; - delete from package where id in (select id from package where state = 'to_delete'); + delete from package where id in ( + select id from package where state = 'to_delete'); commit; '''.format( harvest_source_id=harvest_source_id, related_ids=related_ids) @@ -183,7 +211,8 @@ def harvest_source_clear(context, data_dict): return {'id': harvest_source_id} -def harvest_source_index_clear(context,data_dict): + +def harvest_source_index_clear(context, data_dict): ''' Clears all datasets, jobs and objects related to a harvest source, but keeps the source itself. This is useful to clean history of long running @@ -193,8 +222,8 @@ def harvest_source_index_clear(context,data_dict): :type id: string ''' - check_access('harvest_source_clear',context,data_dict) - harvest_source_id = data_dict.get('id',None) + check_access('harvest_source_clear', context, data_dict) + harvest_source_id = data_dict.get('id') source = HarvestSource.get(harvest_source_id) if not source: @@ -204,8 +233,8 @@ def harvest_source_index_clear(context,data_dict): harvest_source_id = source.id conn = make_connection() - query = ''' +%s:"%s" +site_id:"%s" ''' % ('harvest_source_id', harvest_source_id, - config.get('ckan.site_id')) + query = ''' +%s:"%s" +site_id:"%s" ''' % ( + 'harvest_source_id', harvest_source_id, config.get('ckan.site_id')) try: conn.delete_query(query) if asbool(config.get('ckan.search.solr_commit', 'true')): @@ -240,17 +269,17 @@ def harvest_objects_import(context, data_dict): :type package_id: string ''' log.info('Harvest objects import: %r', data_dict) - check_access('harvest_objects_import',context,data_dict) + check_access('harvest_objects_import', context, data_dict) model = context['model'] session = context['session'] - source_id = data_dict.get('source_id',None) - harvest_object_id = data_dict.get('harvest_object_id',None) - package_id_or_name = data_dict.get('package_id',None) + source_id = data_dict.get('source_id') + harvest_object_id = data_dict.get('harvest_object_id') + package_id_or_name = data_dict.get('package_id') - segments = context.get('segments',None) + segments = context.get('segments') - join_datasets = context.get('join_datasets',True) + join_datasets = context.get('join_datasets', True) if source_id: source = HarvestSource.get(source_id) @@ -262,43 +291,48 @@ def harvest_objects_import(context, data_dict): log.warn('Harvest source %s is not active.', source_id) raise Exception('This harvest source is not active') - last_objects_ids = session.query(HarvestObject.id) \ - .join(HarvestSource) \ - .filter(HarvestObject.source==source) \ - .filter(HarvestObject.current==True) + last_objects_ids = \ + session.query(HarvestObject.id) \ + .join(HarvestSource) \ + .filter(HarvestObject.source == source) \ + .filter(HarvestObject.current == True) elif harvest_object_id: - last_objects_ids = session.query(HarvestObject.id) \ - .filter(HarvestObject.id==harvest_object_id) + last_objects_ids = \ + session.query(HarvestObject.id) \ + .filter(HarvestObject.id == harvest_object_id) elif package_id_or_name: - last_objects_ids = session.query(HarvestObject.id) \ - .join(Package) \ - .filter(HarvestObject.current==True) \ - .filter(Package.state==u'active') \ - .filter(or_(Package.id==package_id_or_name, - Package.name==package_id_or_name)) + last_objects_ids = \ + session.query(HarvestObject.id) \ + .join(Package) \ + .filter(HarvestObject.current == True) \ + .filter(Package.state == u'active') \ + .filter(or_(Package.id == package_id_or_name, + Package.name == package_id_or_name)) join_datasets = False else: - last_objects_ids = session.query(HarvestObject.id) \ - .filter(HarvestObject.current==True) + last_objects_ids = \ + session.query(HarvestObject.id) \ + .filter(HarvestObject.current == True) if join_datasets: last_objects_ids = last_objects_ids.join(Package) \ - .filter(Package.state==u'active') + .filter(Package.state == u'active') last_objects_ids = last_objects_ids.all() last_objects_count = 0 for obj_id in last_objects_ids: - if segments and str(hashlib.md5(obj_id[0]).hexdigest())[0] not in segments: + if segments and \ + str(hashlib.md5(obj_id[0]).hexdigest())[0] not in segments: continue obj = session.query(HarvestObject).get(obj_id) for harvester in PluginImplementations(IHarvester): if harvester.info()['name'] == obj.source.type: - if hasattr(harvester,'force_import'): + if hasattr(harvester, 'force_import'): harvester.force_import = True harvester.import_stage(obj) break @@ -306,7 +340,8 @@ def harvest_objects_import(context, data_dict): log.info('Harvest objects imported: %s', last_objects_count) return last_objects_count -def _caluclate_next_run(frequency): + +def _calculate_next_run(frequency): now = datetime.datetime.utcnow() if frequency == 'ALWAYS': @@ -318,7 +353,7 @@ def _caluclate_next_run(frequency): if frequency == 'DAILY': return now + datetime.timedelta(days=1) if frequency == 'MONTHLY': - if now.month in (4,6,9,11): + if now.month in (4, 6, 9, 11): days = 30 elif now.month == 2: if now.year % 4 == 0: @@ -341,19 +376,20 @@ def _make_scheduled_jobs(context, data_dict): data_dict = {'source_id': source.id} try: get_action('harvest_job_create')(context, data_dict) - except HarvestJobExists, e: + except HarvestJobExists: log.info('Trying to rerun job for %s skipping' % source.id) - source.next_run = _caluclate_next_run(source.frequency) + source.next_run = _calculate_next_run(source.frequency) source.save() -def harvest_jobs_run(context,data_dict): + +def harvest_jobs_run(context, data_dict): log.info('Harvest job run: %r', data_dict) - check_access('harvest_jobs_run',context,data_dict) + check_access('harvest_jobs_run', context, data_dict) session = context['session'] - source_id = data_dict.get('source_id',None) + source_id = data_dict.get('source_id') if not source_id: _make_scheduled_jobs(context, data_dict) @@ -361,38 +397,41 @@ def harvest_jobs_run(context,data_dict): context['return_objects'] = False # Flag finished jobs as such - jobs = harvest_job_list(context,{'source_id':source_id,'status':u'Running'}) + jobs = harvest_job_list( + context, {'source_id': source_id, 'status': u'Running'}) if len(jobs): for job in jobs: if job['gather_finished']: - objects = session.query(HarvestObject.id) \ - .filter(HarvestObject.harvest_job_id==job['id']) \ - .filter(and_((HarvestObject.state!=u'COMPLETE'), - (HarvestObject.state!=u'ERROR'))) \ - .order_by(HarvestObject.import_finished.desc()) + objects = \ + session.query(HarvestObject.id) \ + .filter(HarvestObject.harvest_job_id == job['id']) \ + .filter(and_((HarvestObject.state != u'COMPLETE'), + (HarvestObject.state != u'ERROR'))) \ + .order_by(HarvestObject.import_finished.desc()) if objects.count() == 0: job_obj = HarvestJob.get(job['id']) job_obj.status = u'Finished' last_object = session.query(HarvestObject) \ - .filter(HarvestObject.harvest_job_id==job['id']) \ - .filter(HarvestObject.import_finished!=None) \ - .order_by(HarvestObject.import_finished.desc()) \ - .first() + .filter(HarvestObject.harvest_job_id == job['id']) \ + .filter(HarvestObject.import_finished != None) \ + .order_by(HarvestObject.import_finished.desc()) \ + .first() if last_object: job_obj.finished = last_object.import_finished job_obj.save() # Reindex the harvest source dataset so it has the latest # status - get_action('harvest_source_reindex')(context, - {'id': job_obj.source.id}) + get_action('harvest_source_reindex')( + context, {'id': job_obj.source.id}) # resubmit old redis tasks resubmit_jobs() # Check if there are pending harvest jobs - jobs = harvest_job_list(context,{'source_id':source_id,'status':u'New'}) + jobs = harvest_job_list( + context, {'source_id': source_id, 'status': u'New'}) if len(jobs) == 0: log.info('No new harvest jobs.') raise NoNewHarvestJobError('There are no new harvesting jobs') @@ -402,7 +441,7 @@ def harvest_jobs_run(context,data_dict): sent_jobs = [] for job in jobs: context['detailed'] = False - source = harvest_source_show(context,{'id':job['source_id']}) + source = harvest_source_show(context, {'id': job['source_id']}) if source['active']: job_obj = HarvestJob.get(job['id']) job_obj.status = job['status'] = u'Running' @@ -431,7 +470,7 @@ def harvest_job_abort(context, data_dict): model = context['model'] - source_id = data_dict.get('source_id', None) + source_id = data_dict.get('source_id') source = harvest_source_show(context, {'id': source_id}) # HarvestJob set status to 'Finished' @@ -484,20 +523,22 @@ def harvest_sources_reindex(context, data_dict): model = context['model'] packages = model.Session.query(model.Package) \ - .filter(model.Package.type==DATASET_TYPE_NAME) \ - .filter(model.Package.state==u'active') \ + .filter(model.Package.type == DATASET_TYPE_NAME) \ + .filter(model.Package.state == u'active') \ .all() package_index = PackageSearchIndex() reindex_context = {'defer_commit': True} for package in packages: - get_action('harvest_source_reindex')(reindex_context, {'id': package.id}) + get_action('harvest_source_reindex')( + reindex_context, {'id': package.id}) package_index.commit() return True + @logic.side_effect_free def harvest_source_reindex(context, data_dict): '''Reindex a single harvest source''' @@ -508,8 +549,8 @@ def harvest_source_reindex(context, data_dict): if 'extras_as_string'in context: del context['extras_as_string'] context.update({'ignore_auth': True}) - package_dict = logic.get_action('harvest_source_show')(context, - {'id': harvest_source_id}) + package_dict = logic.get_action('harvest_source_show')( + context, {'id': harvest_source_id}) log.debug('Updating search index for harvest source: {0}'.format( package_dict.get('name') or harvest_source_id)) diff --git a/ckanext/harvest/logic/validators.py b/ckanext/harvest/logic/validators.py index 4dec758..5299273 100644 --- a/ckanext/harvest/logic/validators.py +++ b/ckanext/harvest/logic/validators.py @@ -14,22 +14,25 @@ from ckan.lib.navl.validators import keep_extras log = logging.getLogger(__name__) + def harvest_source_id_exists(value, context): - result = HarvestSource.get(value,None) + result = HarvestSource.get(value) if not result: raise Invalid('Harvest Source with id %r does not exist.' % str(value)) return value + def harvest_job_exists(value, context): - """Check if a harvest job exists and returns the model if it does""" - result = HarvestJob.get(value, None) + '''Check if a harvest job exists and returns the model if it does''' + result = HarvestJob.get(value) if not result: raise Invalid('Harvest Job with id %r does not exist.' % str(value)) return result + def _normalize_url(url): o = urlparse.urlparse(url) @@ -48,14 +51,15 @@ def _normalize_url(url): path = o.path.rstrip('/') check_url = urlparse.urlunparse(( - o.scheme, - netloc, - path, - None,None,None)) + o.scheme, + netloc, + path, + None, None, None)) return check_url -def harvest_source_url_validator(key,data,errors,context): + +def harvest_source_url_validator(key, data, errors, context): package = context.get("package") if package: @@ -67,22 +71,24 @@ def harvest_source_url_validator(key,data,errors,context): #pkg_id = data.get(('id',),'') q = model.Session.query(model.Package.url, model.Package.state) \ - .filter(model.Package.type==DATASET_TYPE_NAME) + .filter(model.Package.type == DATASET_TYPE_NAME) if package_id: # When editing a source we need to avoid its own URL - q = q.filter(model.Package.id!=package_id) + q = q.filter(model.Package.id != package_id) existing_sources = q.all() for url, state in existing_sources: url = _normalize_url(url) if url == new_url: - raise Invalid('There already is a Harvest Source for this URL: %s' % data[key]) + raise Invalid('There already is a Harvest Source for this URL: %s' + % data[key]) return data[key] -def harvest_source_type_exists(value,context): + +def harvest_source_type_exists(value, context): #TODO: use new description interface # Get all the registered harvester types @@ -90,18 +96,20 @@ def harvest_source_type_exists(value,context): for harvester in PluginImplementations(IHarvester): info = harvester.info() if not info or 'name' not in info: - log.error('Harvester %r does not provide the harvester name in the info response' % str(harvester)) + log.error('Harvester %s does not provide the harvester name in ' + 'the info response' % harvester) continue available_types.append(info['name']) - if not value in available_types: - raise Invalid('Unknown harvester type: %s. Have you registered a harvester for this type?' % value) + raise Invalid('Unknown harvester type: %s. Have you registered a ' + 'harvester for this type?' % value) return value -def harvest_source_config_validator(key,data,errors,context): - harvester_type = data.get(('source_type',),'') + +def harvest_source_config_validator(key, data, errors, context): + harvester_type = data.get(('source_type',), '') for harvester in PluginImplementations(IHarvester): info = harvester.info() if info['name'] == harvester_type: @@ -109,21 +117,24 @@ def harvest_source_config_validator(key,data,errors,context): try: return harvester.validate_config(data[key]) except Exception, e: - raise Invalid('Error parsing the configuration options: %s' % str(e)) + raise Invalid('Error parsing the configuration options: %s' + % e) else: return data[key] + def keep_not_empty_extras(key, data, errors, context): extras = data.pop(key, {}) for extras_key, value in extras.iteritems(): if value: data[key[:-1] + (extras_key,)] = value -def harvest_source_extra_validator(key,data,errors,context): - harvester_type = data.get(('source_type',),'') - #gather all extra fields to use as whitelist of what - #can be added to top level data_dict +def harvest_source_extra_validator(key, data, errors, context): + harvester_type = data.get(('source_type',), '') + + # gather all extra fields to use as whitelist of what + # can be added to top level data_dict all_extra_fields = set() for harvester in PluginImplementations(IHarvester): if not hasattr(harvester, 'extra_schema'): @@ -152,8 +163,8 @@ def harvest_source_extra_validator(key,data,errors,context): for key, value in extra_errors.iteritems(): errors[(key,)] = value - ## need to get config out of extras as __extra runs - ## after rest of validation + # need to get config out of extras as __extra runs + # after rest of validation package_extras = data.get(('extras',), []) for num, extra in enumerate(list(package_extras)): @@ -177,21 +188,24 @@ def harvest_source_extra_validator(key,data,errors,context): if package_extras: data[('extras',)] = package_extras -def harvest_source_convert_from_config(key,data,errors,context): + +def harvest_source_convert_from_config(key, data, errors, context): config = data[key] if config: config_dict = json.loads(config) for key, value in config_dict.iteritems(): data[(key,)] = value -def harvest_source_active_validator(value,context): - if isinstance(value,basestring): + +def harvest_source_active_validator(value, context): + if isinstance(value, basestring): if value.lower() == 'true': return True else: return False return bool(value) + def harvest_source_frequency_exists(value): if value == '': value = 'MANUAL' @@ -205,6 +219,7 @@ def dataset_type_exists(value): value = DATASET_TYPE_NAME return value + def harvest_object_extras_validator(value, context): if not isinstance(value, dict): raise Invalid('extras must be a dict') From 5a5260ff0bc52200ff60c17ddb4ca25cd2a0259e Mon Sep 17 00:00:00 2001 From: David Read Date: Tue, 3 Nov 2015 21:42:39 +0000 Subject: [PATCH 13/19] Add test for harvest_source_clear since the PEP8 changes were quite a lot there. --- ckanext/harvest/tests/test_action.py | 56 +++++++++++++++++++++++----- 1 file changed, 47 insertions(+), 9 deletions(-) diff --git a/ckanext/harvest/tests/test_action.py b/ckanext/harvest/tests/test_action.py index 7aed649..674f714 100644 --- a/ckanext/harvest/tests/test_action.py +++ b/ckanext/harvest/tests/test_action.py @@ -2,6 +2,7 @@ import json import copy import factories import unittest +from nose.tools import assert_equal try: from ckan.tests import factories as ckan_factories @@ -127,6 +128,30 @@ class FunctionalTestBaseWithoutClearBetweenTests(object): pass +SOURCE_DICT = { + "url": "http://test.action.com", + "name": "test-source-action", + "title": "Test source action", + "notes": "Test source action desc", + "source_type": "test-for-action", + "frequency": "MANUAL", + "config": json.dumps({"custom_option": ["a", "b"]}) +} + + +class ActionBase(object): + @classmethod + def setup_class(cls): + reset_db() + harvest_model.setup() + if not p.plugin_loaded('test_action_harvester'): + p.load('test_action_harvester') + + @classmethod + def teardown_class(cls): + p.unload('test_action_harvester') + + class HarvestSourceActionBase(FunctionalTestBaseWithoutClearBetweenTests): @classmethod @@ -136,15 +161,7 @@ class HarvestSourceActionBase(FunctionalTestBaseWithoutClearBetweenTests): cls.sysadmin = ckan_factories.Sysadmin() - cls.default_source_dict = { - "url": "http://test.action.com", - "name": "test-source-action", - "title": "Test source action", - "notes": "Test source action desc", - "source_type": "test-for-action", - "frequency": "MANUAL", - "config": json.dumps({"custom_option": ["a", "b"]}) - } + cls.default_source_dict = SOURCE_DICT if not p.plugin_loaded('test_action_harvester'): p.load('test_action_harvester') @@ -287,6 +304,27 @@ class TestHarvestSourceActionUpdate(HarvestSourceActionBase): assert source.type == source_dict['source_type'] +class TestActions(ActionBase): + def test_harvest_source_clear(self): + source = factories.HarvestSourceObj(**SOURCE_DICT) + job = factories.HarvestJobObj(source=source) + dataset = ckan_factories.Dataset() + object_ = factories.HarvestObjectObj(job=job, source=source, + package_id=dataset['id']) + + context = {'model': model, 'session': model.Session, + 'ignore_auth': True, 'user': ''} + result = toolkit.get_action('harvest_source_clear')( + context, {'id': source.id}) + + assert_equal(result, {'id': source.id}) + source = harvest_model.HarvestSource.get(source.id) + assert source + assert_equal(harvest_model.HarvestJob.get(job.id), None) + assert_equal(harvest_model.HarvestObject.get(object_.id), None) + assert_equal(model.Package.get(dataset['id']), None) + + class TestHarvestObject(unittest.TestCase): @classmethod def setup_class(cls): From 10685badb50b545349e863d1851adb75c731b7a2 Mon Sep 17 00:00:00 2001 From: David Read Date: Tue, 3 Nov 2015 20:30:11 +0000 Subject: [PATCH 14/19] PEP8 based on #174 Conflicts: ckanext/harvest/logic/action/delete.py ckanext/harvest/logic/action/update.py ckanext/harvest/logic/validators.py --- ckanext/harvest/logic/action/create.py | 6 +- ckanext/harvest/logic/action/delete.py | 2 +- ckanext/harvest/logic/action/update.py | 180 ++++++++++++++----------- ckanext/harvest/logic/validators.py | 44 +++--- 4 files changed, 125 insertions(+), 107 deletions(-) diff --git a/ckanext/harvest/logic/action/create.py b/ckanext/harvest/logic/action/create.py index 203b9eb..d904c5c 100644 --- a/ckanext/harvest/logic/action/create.py +++ b/ckanext/harvest/logic/action/create.py @@ -148,15 +148,15 @@ def _check_for_existing_jobs(context, source_id): return exist def harvest_object_create(context, data_dict): - """ Create a new harvest object + ''' Create a new harvest object :type guid: string (optional) :type content: string (optional) - :type job_id: string + :type job_id: string :type source_id: string (optional) :type package_id: string (optional) :type extras: dict (optional) - """ + ''' check_access('harvest_object_create', context, data_dict) data, errors = _validate(data_dict, harvest_object_create_schema(), context) diff --git a/ckanext/harvest/logic/action/delete.py b/ckanext/harvest/logic/action/delete.py index 88ae8b9..17751a4 100644 --- a/ckanext/harvest/logic/action/delete.py +++ b/ckanext/harvest/logic/action/delete.py @@ -2,7 +2,6 @@ import logging from ckan import plugins as p - log = logging.getLogger(__name__) @@ -23,5 +22,6 @@ def harvest_source_delete(context, data_dict): if context.get('clear_source', False): # We need the id, the name won't work. package_dict = p.toolkit.get_action('package_show')(context, data_dict) + p.toolkit.get_action('harvest_source_clear')( context, {'id': package_dict['id']}) diff --git a/ckanext/harvest/logic/action/update.py b/ckanext/harvest/logic/action/update.py index 8280419..4649112 100644 --- a/ckanext/harvest/logic/action/update.py +++ b/ckanext/harvest/logic/action/update.py @@ -29,9 +29,8 @@ from ckanext.harvest.model import HarvestSource, HarvestJob, HarvestObject from ckanext.harvest.logic import HarvestJobExists, NoNewHarvestJobError from ckanext.harvest.logic.dictization import harvest_job_dictize -from ckanext.harvest.logic.action.get import harvest_source_show, \ - harvest_job_list, _get_sources_for_user - +from ckanext.harvest.logic.action.get import ( + harvest_source_show, harvest_job_list, _get_sources_for_user) log = logging.getLogger(__name__) @@ -71,10 +70,8 @@ def harvest_source_update(context, data_dict): type. Should be a serialized as JSON. (optional) :type config: string - :returns: the newly created harvest source :rtype: dictionary - ''' log.info('Updating harvest source: %r', data_dict) @@ -112,11 +109,11 @@ def harvest_source_clear(context, data_dict): model = context['model'] - sql = "select id from related where id in (select related_id from " \ - "related_dataset where dataset_id in (select package_id from " \ - "harvest_object where harvest_source_id = " \ - "'{harvest_source_id}'));".format( - harvest_source_id=harvest_source_id) + sql = '''select id from related where id in ( + select related_id from related_dataset where dataset_id in ( + select package_id from harvest_object + where harvest_source_id = '{harvest_source_id}'));'''.format( + harvest_source_id=harvest_source_id) result = model.Session.execute(sql) ids = [] for row in result: @@ -124,60 +121,84 @@ def harvest_source_clear(context, data_dict): related_ids = "('" + "','".join(ids) + "')" sql = '''begin; - update package set state = 'to_delete' where id in (select package_id from harvest_object where harvest_source_id = '{harvest_source_id}');'''.format( + update package set state = 'to_delete' where id in ( + select package_id from harvest_object + where harvest_source_id = '{harvest_source_id}');'''.format( harvest_source_id=harvest_source_id) # CKAN-2.3 or above: delete resource views, resource revisions & resources if toolkit.check_ckan_version(min_version='2.3'): sql += ''' - delete from resource_view where resource_id in (select id from resource where package_id in (select id from package where state = 'to_delete' )); - delete from resource_revision where package_id in (select id from package where state = 'to_delete' ); - delete from resource where package_id in (select id from package where state = 'to_delete' ); + delete from resource_view where resource_id in ( + select id from resource where package_id in ( + select id from package where state = 'to_delete')); + delete from resource_revision where package_id in ( + select id from package where state = 'to_delete'); + delete from resource where package_id in ( + select id from package where state = 'to_delete'); ''' # Backwards-compatibility: support ResourceGroup (pre-CKAN-2.3) else: sql += ''' - delete from resource_revision where resource_group_id in - (select id from resource_group where package_id in - (select id from package where state = 'to_delete')); - delete from resource where resource_group_id in - (select id from resource_group where package_id in - (select id from package where state = 'to_delete')); - delete from resource_group_revision where package_id in - (select id from package where state = 'to_delete'); - delete from resource_group where package_id in - (select id from package where state = 'to_delete'); + delete from resource_revision where resource_group_id in ( + select id from resource_group where package_id in ( + select id from package where state = 'to_delete')); + delete from resource where resource_group_id in ( + select id from resource_group where package_id in ( + select id from package where state = 'to_delete')); + delete from resource_group_revision where package_id in ( + select id from package where state = 'to_delete'); + delete from resource_group where package_id in ( + select id from package where state = 'to_delete'); ''' # CKAN pre-2.5: authz models were removed in migration 078 if toolkit.check_ckan_version(max_version='2.4.99'): sql += ''' - delete from package_role where package_id in - (select id from package where state = 'to_delete'); - delete from user_object_role where id not in - (select user_object_role_id from package_role) and context = 'Package'; + delete from package_role where package_id in ( + select id from package where state = 'to_delete'); + delete from user_object_role where id not in ( + select user_object_role_id from package_role) + and context = 'Package'; ''' - sql += ''' - delete from harvest_object_error where harvest_object_id in (select id from harvest_object where harvest_source_id = '{harvest_source_id}'); - delete from harvest_object_extra where harvest_object_id in (select id from harvest_object where harvest_source_id = '{harvest_source_id}'); + delete from harvest_object_error where harvest_object_id in ( + select id from harvest_object + where harvest_source_id = '{harvest_source_id}'); + delete from harvest_object_extra where harvest_object_id in ( + select id from harvest_object + where harvest_source_id = '{harvest_source_id}'); delete from harvest_object where harvest_source_id = '{harvest_source_id}'; - delete from harvest_gather_error where harvest_job_id in (select id from harvest_job where source_id = '{harvest_source_id}'); + delete from harvest_gather_error where harvest_job_id in ( + select id from harvest_job where source_id = '{harvest_source_id}'); delete from harvest_job where source_id = '{harvest_source_id}'; - delete from package_tag_revision where package_id in (select id from package where state = 'to_delete'); - delete from member_revision where table_id in (select id from package where state = 'to_delete'); - delete from package_extra_revision where package_id in (select id from package where state = 'to_delete'); - delete from package_revision where id in (select id from package where state = 'to_delete'); - delete from package_tag where package_id in (select id from package where state = 'to_delete'); - delete from package_extra where package_id in (select id from package where state = 'to_delete'); - delete from package_relationship_revision where subject_package_id in (select id from package where state = 'to_delete'); - delete from package_relationship_revision where object_package_id in (select id from package where state = 'to_delete'); - delete from package_relationship where subject_package_id in (select id from package where state = 'to_delete'); - delete from package_relationship where object_package_id in (select id from package where state = 'to_delete'); - delete from member where table_id in (select id from package where state = 'to_delete'); - delete from related_dataset where dataset_id in (select id from package where state = 'to_delete'); + delete from package_tag_revision where package_id in ( + select id from package where state = 'to_delete'); + delete from member_revision where table_id in ( + select id from package where state = 'to_delete'); + delete from package_extra_revision where package_id in ( + select id from package where state = 'to_delete'); + delete from package_revision where id in ( + select id from package where state = 'to_delete'); + delete from package_tag where package_id in ( + select id from package where state = 'to_delete'); + delete from package_extra where package_id in ( + select id from package where state = 'to_delete'); + delete from package_relationship_revision where subject_package_id in ( + select id from package where state = 'to_delete'); + delete from package_relationship_revision where object_package_id in ( + select id from package where state = 'to_delete'); + delete from package_relationship where subject_package_id in ( + select id from package where state = 'to_delete'); + delete from package_relationship where object_package_id in ( + select id from package where state = 'to_delete'); + delete from member where table_id in ( + select id from package where state = 'to_delete'); + delete from related_dataset where dataset_id in ( + select id from package where state = 'to_delete'); delete from related where id in {related_ids}; - delete from package where id in (select id from package where state = 'to_delete'); + delete from package where id in ( + select id from package where state = 'to_delete'); commit; '''.format( harvest_source_id=harvest_source_id, related_ids=related_ids) @@ -190,7 +211,7 @@ def harvest_source_clear(context, data_dict): return {'id': harvest_source_id} -def harvest_source_index_clear(context,data_dict): +def harvest_source_index_clear(context, data_dict): ''' Clears all datasets, jobs and objects related to a harvest source, but keeps the source itself. This is useful to clean history of long running @@ -199,8 +220,8 @@ def harvest_source_index_clear(context,data_dict): :type id: string ''' - check_access('harvest_source_clear',context,data_dict) - harvest_source_id = data_dict.get('id',None) + check_access('harvest_source_clear', context, data_dict) + harvest_source_id = data_dict.get('id') source = HarvestSource.get(harvest_source_id) if not source: @@ -210,8 +231,8 @@ def harvest_source_index_clear(context,data_dict): harvest_source_id = source.id conn = make_connection() - query = ''' +%s:"%s" +site_id:"%s" ''' % ('harvest_source_id', harvest_source_id, - config.get('ckan.site_id')) + query = ''' +%s:"%s" +site_id:"%s" ''' % ( + 'harvest_source_id', harvest_source_id, config.get('ckan.site_id')) try: conn.delete_query(query) if asbool(config.get('ckan.search.solr_commit', 'true')): @@ -279,7 +300,7 @@ def harvest_objects_import(context, data_dict): model = context['model'] session = context['session'] - source_id = data_dict.get('source_id',) + source_id = data_dict.get('source_id') harvest_object_id = data_dict.get('harvest_object_id') package_id_or_name = data_dict.get('package_id') @@ -297,25 +318,29 @@ def harvest_objects_import(context, data_dict): log.warn('Harvest source %s is not active.', source_id) raise Exception('This harvest source is not active') - last_objects_ids = session.query(HarvestObject.id) \ - .join(HarvestSource) \ - .filter(HarvestObject.source == source) \ - .filter(HarvestObject.current == True) + last_objects_ids = \ + session.query(HarvestObject.id) \ + .join(HarvestSource) \ + .filter(HarvestObject.source == source) \ + .filter(HarvestObject.current == True) elif harvest_object_id: - last_objects_ids = session.query(HarvestObject.id) \ - .filter(HarvestObject.id == harvest_object_id) + last_objects_ids = \ + session.query(HarvestObject.id) \ + .filter(HarvestObject.id == harvest_object_id) elif package_id_or_name: - last_objects_ids = session.query(HarvestObject.id) \ - .join(Package) \ - .filter(HarvestObject.current == True) \ - .filter(Package.state == u'active') \ - .filter(or_(Package.id == package_id_or_name, - Package.name == package_id_or_name)) + last_objects_ids = \ + session.query(HarvestObject.id) \ + .join(Package) \ + .filter(HarvestObject.current == True) \ + .filter(Package.state == u'active') \ + .filter(or_(Package.id == package_id_or_name, + Package.name == package_id_or_name)) join_datasets = False else: - last_objects_ids = session.query(HarvestObject.id) \ - .filter(HarvestObject.current == True) + last_objects_ids = \ + session.query(HarvestObject.id) \ + .filter(HarvestObject.current == True) if join_datasets: last_objects_ids = last_objects_ids.join(Package) \ @@ -327,7 +352,7 @@ def harvest_objects_import(context, data_dict): for obj_id in last_objects_ids: if segments and \ - str(hashlib.md5(obj_id[0]).hexdigest())[0] not in segments: + str(hashlib.md5(obj_id[0]).hexdigest())[0] not in segments: continue obj = session.query(HarvestObject).get(obj_id) @@ -343,7 +368,7 @@ def harvest_objects_import(context, data_dict): return last_objects_count -def _caluclate_next_run(frequency): +def _calculate_next_run(frequency): now = datetime.datetime.utcnow() if frequency == 'ALWAYS': @@ -378,10 +403,10 @@ def _make_scheduled_jobs(context, data_dict): data_dict = {'source_id': source.id} try: get_action('harvest_job_create')(context, data_dict) - except HarvestJobExists, e: + except HarvestJobExists: log.info('Trying to rerun job for %s skipping' % source.id) - source.next_run = _caluclate_next_run(source.frequency) + source.next_run = _calculate_next_run(source.frequency) source.save() @@ -404,11 +429,12 @@ def harvest_jobs_run(context, data_dict): if len(jobs): for job in jobs: if job['gather_finished']: - objects = session.query(HarvestObject.id) \ - .filter(HarvestObject.harvest_job_id == job['id']) \ - .filter(and_((HarvestObject.state != u'COMPLETE'), - (HarvestObject.state != u'ERROR'))) \ - .order_by(HarvestObject.import_finished.desc()) + objects = \ + session.query(HarvestObject.id) \ + .filter(HarvestObject.harvest_job_id == job['id']) \ + .filter(and_((HarvestObject.state != u'COMPLETE'), + (HarvestObject.state != u'ERROR'))) \ + .order_by(HarvestObject.import_finished.desc()) if objects.count() == 0: job_obj = HarvestJob.get(job['id']) @@ -471,7 +497,7 @@ def harvest_job_abort(context, data_dict): model = context['model'] - source_id = data_dict.get('source_id', None) + source_id = data_dict.get('source_id') source = harvest_source_show(context, {'id': source_id}) # HarvestJob set status to 'Finished' @@ -550,8 +576,8 @@ def harvest_source_reindex(context, data_dict): if 'extras_as_string'in context: del context['extras_as_string'] context.update({'ignore_auth': True}) - package_dict = logic.get_action('harvest_source_show')(context, - {'id': harvest_source_id}) + package_dict = logic.get_action('harvest_source_show')( + context, {'id': harvest_source_id}) log.debug('Updating search index for harvest source: {0}'.format( package_dict.get('name') or harvest_source_id)) diff --git a/ckanext/harvest/logic/validators.py b/ckanext/harvest/logic/validators.py index 01e73ef..bb1ab12 100644 --- a/ckanext/harvest/logic/validators.py +++ b/ckanext/harvest/logic/validators.py @@ -17,7 +17,7 @@ log = logging.getLogger(__name__) def harvest_source_id_exists(value, context): - result = HarvestSource.get(value, None) + result = HarvestSource.get(value) if not result: raise Invalid('Harvest Source with id %r does not exist.' % str(value)) @@ -26,8 +26,7 @@ def harvest_source_id_exists(value, context): def harvest_job_exists(value, context): '''Check if a harvest job exists and returns the model if it does''' - - result = HarvestJob.get(value, None) + result = HarvestJob.get(value) if not result: raise Invalid('Harvest Job with id %r does not exist.' % str(value)) @@ -52,10 +51,10 @@ def _normalize_url(url): path = o.path.rstrip('/') check_url = urlparse.urlunparse(( - o.scheme, - netloc, - path, - None, None, None)) + o.scheme, + netloc, + path, + None, None, None)) return check_url @@ -80,12 +79,11 @@ def harvest_source_url_validator(key, data, errors, context): new_url = _normalize_url(data[key]) - q = model.Session.query( - model.Package.id, model.Package.url) \ - .filter(model.Package.type == DATASET_TYPE_NAME) + q = model.Session.query(model.Package.id, model.Package.url) \ + .filter(model.Package.type == DATASET_TYPE_NAME) if package_id: - # When editing a source we need to avoid its own URL. + # When editing a source we need to avoid its own URL q = q.filter(model.Package.id != package_id) existing_sources = q.all() @@ -102,10 +100,8 @@ def harvest_source_url_validator(key, data, errors, context): if url == new_url and conf == new_config: # You can have a duplicate URL if it's pointing to a unique # set as it will be harvesting unique datasets. - raise Invalid( - 'There already is a Harvest Source for this URL: %s' - % data[key] - ) + raise Invalid('There already is a Harvest Source for this URL: %s' + % data[key]) return data[key] @@ -118,18 +114,14 @@ def harvest_source_type_exists(value, context): for harvester in PluginImplementations(IHarvester): info = harvester.info() if not info or 'name' not in info: - log.error( - 'Harvester %r does not provide the harvester name in the info ' - 'response' % str(harvester) - ) + log.error('Harvester %s does not provide the harvester name in ' + 'the info response' % harvester) continue available_types.append(info['name']) - if value not in available_types: - raise Invalid( - 'Unknown harvester type: %s. Have you registered a harvester for ' - 'this type?' % value - ) + if not value in available_types: + raise Invalid('Unknown harvester type: %s. Have you registered a ' + 'harvester for this type?' % value) return value @@ -143,8 +135,8 @@ def harvest_source_config_validator(key, data, errors, context): try: return harvester.validate_config(data[key]) except Exception, e: - raise Invalid( - 'Error parsing the configuration options: %s' % str(e)) + raise Invalid('Error parsing the configuration options: %s' + % e) else: return data[key] From 8c1f7619cba31fd09244dc595022f9b8d7037a5f Mon Sep 17 00:00:00 2001 From: David Read Date: Tue, 3 Nov 2015 22:08:46 +0000 Subject: [PATCH 15/19] Fix code style to be more ckan-like whilst still pep8. --- ckanext/harvest/logic/action/delete.py | 8 +++++--- ckanext/harvest/logic/action/update.py | 3 +-- ckanext/harvest/logic/validators.py | 6 +++--- 3 files changed, 9 insertions(+), 8 deletions(-) diff --git a/ckanext/harvest/logic/action/delete.py b/ckanext/harvest/logic/action/delete.py index 17751a4..b2dcfb3 100644 --- a/ckanext/harvest/logic/action/delete.py +++ b/ckanext/harvest/logic/action/delete.py @@ -6,21 +6,23 @@ log = logging.getLogger(__name__) def harvest_source_delete(context, data_dict): - '''Deletes an existing harvest source. + '''Deletes an existing harvest source This method just proxies the request to package_delete, which will delete the actual harvest type dataset and the HarvestSource object (via the after_delete extension point). :param id: the name or id of the harvest source to delete + :type id: string ''' - log.info('Deleting harvest source: %r', data_dict) p.toolkit.check_access('harvest_source_delete', context, data_dict) + p.toolkit.get_action('package_delete')(context, data_dict) + if context.get('clear_source', False): - # We need the id, the name won't work. + # We need the id. The name won't work. package_dict = p.toolkit.get_action('package_show')(context, data_dict) p.toolkit.get_action('harvest_source_clear')( diff --git a/ckanext/harvest/logic/action/update.py b/ckanext/harvest/logic/action/update.py index f8dbaa8..23318bf 100644 --- a/ckanext/harvest/logic/action/update.py +++ b/ckanext/harvest/logic/action/update.py @@ -266,7 +266,6 @@ def harvest_objects_import(context, data_dict): :param package_id: the id or name of the package to import :type package_id: string ''' - log.info('Harvest objects import: %r', data_dict) check_access('harvest_objects_import', context, data_dict) @@ -540,7 +539,7 @@ def harvest_sources_reindex(context, data_dict): @logic.side_effect_free def harvest_source_reindex(context, data_dict): - '''Reindex a single harvest source.''' + '''Reindex a single harvest source''' harvest_source_id = logic.get_or_bust(data_dict, 'id') defer_commit = context.get('defer_commit', False) diff --git a/ckanext/harvest/logic/validators.py b/ckanext/harvest/logic/validators.py index bb1ab12..d8c6a91 100644 --- a/ckanext/harvest/logic/validators.py +++ b/ckanext/harvest/logic/validators.py @@ -60,17 +60,17 @@ def _normalize_url(url): def harvest_source_url_validator(key, data, errors, context): - '''Validate the provided harvest source URL. + '''Validate the provided harvest source URL Checks that the URL is not already existing with the same config. ''' - package = context.get("package") + package = context.get('package') if package: package_id = package.id else: - package_id = data.get(key[:-1] + ("id",)) + package_id = data.get(key[:-1] + ('id',)) try: new_config = data.get(key[:-1] + ('config',)) From 1ccb0dae44e11a3420a1c4fbe9129e347dd9375a Mon Sep 17 00:00:00 2001 From: David Read Date: Tue, 3 Nov 2015 22:12:49 +0000 Subject: [PATCH 16/19] A few more PEP8 changes taken from #174. --- ckanext/harvest/logic/action/delete.py | 9 ++------- ckanext/harvest/logic/action/update.py | 1 - ckanext/harvest/logic/validators.py | 5 ++--- 3 files changed, 4 insertions(+), 11 deletions(-) diff --git a/ckanext/harvest/logic/action/delete.py b/ckanext/harvest/logic/action/delete.py index 36eff6b..70e8c0a 100644 --- a/ckanext/harvest/logic/action/delete.py +++ b/ckanext/harvest/logic/action/delete.py @@ -6,8 +6,7 @@ log = logging.getLogger(__name__) def harvest_source_delete(context, data_dict): - ''' - Deletes an existing harvest source + '''Deletes an existing harvest source This method just proxies the request to package_delete, which will delete the actual harvest type dataset and the @@ -15,10 +14,6 @@ def harvest_source_delete(context, data_dict): :param id: the name or id of the harvest source to delete :type id: string - - :returns: the newly created harvest source - :rtype: dictionary - ''' log.info('Deleting harvest source: %r', data_dict) @@ -28,7 +23,7 @@ def harvest_source_delete(context, data_dict): if context.get('clear_source', False): - # We need the id, the name won't work + # We need the id. The name won't work. package_dict = p.toolkit.get_action('package_show')(context, data_dict) p.toolkit.get_action('harvest_source_clear')( diff --git a/ckanext/harvest/logic/action/update.py b/ckanext/harvest/logic/action/update.py index b51f37f..bd06d2c 100644 --- a/ckanext/harvest/logic/action/update.py +++ b/ckanext/harvest/logic/action/update.py @@ -91,7 +91,6 @@ def harvest_source_clear(context, data_dict): :param id: the id of the harvest source to clear :type id: string - ''' check_access('harvest_source_clear', context, data_dict) diff --git a/ckanext/harvest/logic/validators.py b/ckanext/harvest/logic/validators.py index 5299273..5ce079b 100644 --- a/ckanext/harvest/logic/validators.py +++ b/ckanext/harvest/logic/validators.py @@ -68,7 +68,6 @@ def harvest_source_url_validator(key, data, errors, context): package_id = data.get(key[:-1] + ("id",)) new_url = _normalize_url(data[key]) - #pkg_id = data.get(('id',),'') q = model.Session.query(model.Package.url, model.Package.state) \ .filter(model.Package.type == DATASET_TYPE_NAME) @@ -89,7 +88,7 @@ def harvest_source_url_validator(key, data, errors, context): def harvest_source_type_exists(value, context): - #TODO: use new description interface + # TODO: use new description interface # Get all the registered harvester types available_types = [] @@ -153,7 +152,7 @@ def harvest_source_extra_validator(key, data, errors, context): extra_data, extra_errors = validate(data.get(key, {}), extra_schema) for key in extra_data.keys(): - #only allow keys that appear in at least one harvester + # only allow keys that appear in at least one harvester if key not in all_extra_fields: extra_data.pop(key) From 77e5b89a01fdf0cb901c88245d6a5e37183b99a3 Mon Sep 17 00:00:00 2001 From: David Read Date: Tue, 3 Nov 2015 22:23:04 +0000 Subject: [PATCH 17/19] Blank line needed. --- ckanext/harvest/logic/action/delete.py | 1 + ckanext/harvest/logic/action/update.py | 1 + 2 files changed, 2 insertions(+) diff --git a/ckanext/harvest/logic/action/delete.py b/ckanext/harvest/logic/action/delete.py index b2dcfb3..70e8c0a 100644 --- a/ckanext/harvest/logic/action/delete.py +++ b/ckanext/harvest/logic/action/delete.py @@ -22,6 +22,7 @@ def harvest_source_delete(context, data_dict): p.toolkit.get_action('package_delete')(context, data_dict) if context.get('clear_source', False): + # We need the id. The name won't work. package_dict = p.toolkit.get_action('package_show')(context, data_dict) diff --git a/ckanext/harvest/logic/action/update.py b/ckanext/harvest/logic/action/update.py index 23318bf..bd06d2c 100644 --- a/ckanext/harvest/logic/action/update.py +++ b/ckanext/harvest/logic/action/update.py @@ -216,6 +216,7 @@ def harvest_source_index_clear(context, data_dict): Clears all datasets, jobs and objects related to a harvest source, but keeps the source itself. This is useful to clean history of long running harvest sources to start again fresh. + :param id: the id of the harvest source to clear :type id: string ''' From 5fba056c59a4ac0c8254b1649ff47b1af68b77f2 Mon Sep 17 00:00:00 2001 From: David Read Date: Tue, 3 Nov 2015 23:19:05 +0000 Subject: [PATCH 18/19] [#184] Add tests --- ckanext/harvest/logic/validators.py | 12 +++---- ckanext/harvest/tests/test_action.py | 52 ++++++++++++++++++++++++++-- 2 files changed, 54 insertions(+), 10 deletions(-) diff --git a/ckanext/harvest/logic/validators.py b/ckanext/harvest/logic/validators.py index d8c6a91..d12be67 100644 --- a/ckanext/harvest/logic/validators.py +++ b/ckanext/harvest/logic/validators.py @@ -62,7 +62,7 @@ def _normalize_url(url): def harvest_source_url_validator(key, data, errors, context): '''Validate the provided harvest source URL - Checks that the URL is not already existing with the same config. + Checks that the URL & config combination are unique to this HarvestSource. ''' package = context.get('package') @@ -88,20 +88,18 @@ def harvest_source_url_validator(key, data, errors, context): existing_sources = q.all() - for uid, url in existing_sources: + for id_, url in existing_sources: url = _normalize_url(url) conf = model.Session.query(HarvestSource.config).filter( - HarvestSource.id == uid).first() + HarvestSource.id == id_).first() if conf: conf = conf[0] else: conf = None if url == new_url and conf == new_config: - # You can have a duplicate URL if it's pointing to a unique - # set as it will be harvesting unique datasets. - raise Invalid('There already is a Harvest Source for this URL: %s' - % data[key]) + raise Invalid('There already is a Harvest Source for this URL (& ' + 'config): url=%s config=%s' % (new_url, new_config)) return data[key] diff --git a/ckanext/harvest/tests/test_action.py b/ckanext/harvest/tests/test_action.py index 674f714..7f6a7aa 100644 --- a/ckanext/harvest/tests/test_action.py +++ b/ckanext/harvest/tests/test_action.py @@ -2,7 +2,7 @@ import json import copy import factories import unittest -from nose.tools import assert_equal +from nose.tools import assert_equal, assert_raises try: from ckan.tests import factories as ckan_factories @@ -13,6 +13,7 @@ except ImportError: from ckan import plugins as p from ckan.plugins import toolkit from ckan import model +import ckan.lib.search as search from ckanext.harvest.interfaces import IHarvester import ckanext.harvest.model as harvest_model @@ -142,11 +143,14 @@ SOURCE_DICT = { class ActionBase(object): @classmethod def setup_class(cls): - reset_db() - harvest_model.setup() if not p.plugin_loaded('test_action_harvester'): p.load('test_action_harvester') + def setup(self): + reset_db() + search.clear_all() + harvest_model.setup() + @classmethod def teardown_class(cls): p.unload('test_action_harvester') @@ -324,6 +328,48 @@ class TestActions(ActionBase): assert_equal(harvest_model.HarvestObject.get(object_.id), None) assert_equal(model.Package.get(dataset['id']), None) + def test_harvest_source_create_twice_with_unique_url(self): + # don't use factory because it looks for the existing source + data_dict = SOURCE_DICT + site_user = toolkit.get_action('get_site_user')( + {'model': model, 'ignore_auth': True}, {})['name'] + + toolkit.get_action('harvest_source_create')( + {'user': site_user}, data_dict) + + data_dict['name'] = 'another-source1' + data_dict['url'] = 'http://another-url' + toolkit.get_action('harvest_source_create')( + {'user': site_user}, data_dict) + + def test_harvest_source_create_twice_with_same_url(self): + # don't use factory because it looks for the existing source + data_dict = SOURCE_DICT + site_user = toolkit.get_action('get_site_user')( + {'model': model, 'ignore_auth': True}, {})['name'] + + toolkit.get_action('harvest_source_create')( + {'user': site_user}, data_dict) + + data_dict['name'] = 'another-source2' + assert_raises(toolkit.ValidationError, + toolkit.get_action('harvest_source_create'), + {'user': site_user}, data_dict) + + def test_harvest_source_create_twice_with_unique_url_and_config(self): + # don't use factory because it looks for the existing source + data_dict = SOURCE_DICT + site_user = toolkit.get_action('get_site_user')( + {'model': model, 'ignore_auth': True}, {})['name'] + + toolkit.get_action('harvest_source_create')( + {'user': site_user}, data_dict) + + data_dict['name'] = 'another-source3' + data_dict['config'] = '{"something": "new"}' + toolkit.get_action('harvest_source_create')( + {'user': site_user}, data_dict) + class TestHarvestObject(unittest.TestCase): @classmethod From f9da3654f89549bfaad7fd48c3584df9032391c0 Mon Sep 17 00:00:00 2001 From: David Read Date: Tue, 3 Nov 2015 23:27:52 +0000 Subject: [PATCH 19/19] [#184] Fix tests for older ckan versions. --- ckanext/harvest/tests/test_action.py | 1 - 1 file changed, 1 deletion(-) diff --git a/ckanext/harvest/tests/test_action.py b/ckanext/harvest/tests/test_action.py index 7f6a7aa..24127b0 100644 --- a/ckanext/harvest/tests/test_action.py +++ b/ckanext/harvest/tests/test_action.py @@ -148,7 +148,6 @@ class ActionBase(object): def setup(self): reset_db() - search.clear_all() harvest_model.setup() @classmethod