From 02b81187dfcb91783dcbcb06c1a503791d7c3537 Mon Sep 17 00:00:00 2001 From: Mark Winterbottom Date: Fri, 30 Oct 2015 15:15:41 +0000 Subject: [PATCH] Fixed bug with deleting harvest source's which have a custom configuration. Added PEP-8 compliance. --- ckanext/harvest/logic/action/delete.py | 15 ++- ckanext/harvest/logic/action/update.py | 157 ++++++++++++++----------- 2 files changed, 98 insertions(+), 74 deletions(-) diff --git a/ckanext/harvest/logic/action/delete.py b/ckanext/harvest/logic/action/delete.py index 405aa3a..3e34aff 100644 --- a/ckanext/harvest/logic/action/delete.py +++ b/ckanext/harvest/logic/action/delete.py @@ -5,8 +5,9 @@ from ckan import plugins as p log = logging.getLogger(__name__) + def harvest_source_delete(context, data_dict): - ''' + """ Deletes an existing harvest source This method just proxies the request to package_delete, @@ -19,16 +20,14 @@ def harvest_source_delete(context, data_dict): :returns: the newly created harvest source :rtype: dictionary - ''' + """ + log.info('Deleting harvest source: %r', data_dict) p.toolkit.check_access('harvest_source_delete', context, data_dict) - p.toolkit.get_action('package_delete')(context, data_dict) - if context.get('clear_source', False): - - # We need the id, the name won't work + # We need the id, the name won't work. package_dict = p.toolkit.get_action('package_show')(context, data_dict) - - p.toolkit.get_action('harvest_source_clear')(context, {'id': package_dict['id']}) + p.toolkit.get_action('harvest_source_clear')( + context, {'id': package_dict['id']}) diff --git a/ckanext/harvest/logic/action/update.py b/ckanext/harvest/logic/action/update.py index 9e871d9..75d99d5 100644 --- a/ckanext/harvest/logic/action/update.py +++ b/ckanext/harvest/logic/action/update.py @@ -29,12 +29,14 @@ from ckanext.harvest.model import HarvestSource, HarvestJob, HarvestObject from ckanext.harvest.logic import HarvestJobExists, NoNewHarvestJobError from ckanext.harvest.logic.schema import harvest_source_show_package_schema -from ckanext.harvest.logic.action.get import harvest_source_show, harvest_job_list, _get_sources_for_user +from ckanext.harvest.logic.action.get import harvest_source_show, \ + harvest_job_list, _get_sources_for_user log = logging.getLogger(__name__) -def harvest_source_update(context,data_dict): + +def harvest_source_update(context, data_dict): ''' Updates an existing harvest source @@ -83,18 +85,22 @@ def harvest_source_update(context,data_dict): return source -def harvest_source_clear(context,data_dict): + +def harvest_source_clear(context, data_dict): ''' - Clears all datasets, jobs and objects related to a harvest source, but keeps the source itself. - This is useful to clean history of long running harvest sources to start again fresh. + Clears all datasets, jobs and objects related to a harvest source, but + keeps the source itself. + This is useful to clean history of long running harvest sources to start + again fresh. :param id: the id of the harvest source to clear :type id: string ''' - check_access('harvest_source_clear',context,data_dict) - harvest_source_id = data_dict.get('id',None) + check_access('harvest_source_clear', context, data_dict) + + harvest_source_id = data_dict.get('id', None) source = HarvestSource.get(harvest_source_id) if not source: @@ -108,14 +114,18 @@ def harvest_source_clear(context,data_dict): model = context['model'] - sql = "select id from related where id in (select related_id from related_dataset where dataset_id in (select package_id from harvest_object where harvest_source_id = '{harvest_source_id}'));".format(harvest_source_id=harvest_source_id) + sql = "select id from related where id in (select related_id from " \ + "related_dataset where dataset_id in (select package_id from " \ + "harvest_object where harvest_source_id = " \ + "'{harvest_source_id}'));".format( + harvest_source_id=harvest_source_id) result = model.Session.execute(sql) ids = [] for row in result: ids.append(row[0]) related_ids = "('" + "','".join(ids) + "')" - sql = '''begin; + sql = '''begin; update package set state = 'to_delete' where id in (select package_id from harvest_object where harvest_source_id = '{harvest_source_id}');'''.format( harvest_source_id=harvest_source_id) @@ -129,15 +139,15 @@ def harvest_source_clear(context,data_dict): # Backwards-compatibility: support ResourceGroup (pre-CKAN-2.3) else: sql += ''' - delete from resource_revision where resource_group_id in - (select id from resource_group where package_id in + delete from resource_revision where resource_group_id in + (select id from resource_group where package_id in (select id from package where state = 'to_delete')); - delete from resource where resource_group_id in - (select id from resource_group where package_id in + delete from resource where resource_group_id in + (select id from resource_group where package_id in (select id from package where state = 'to_delete')); - delete from resource_group_revision where package_id in + delete from resource_group_revision where package_id in (select id from package where state = 'to_delete'); - delete from resource_group where package_id in + delete from resource_group where package_id in (select id from package where state = 'to_delete'); ''' # CKAN pre-2.5: authz models were removed in migration 078 @@ -181,10 +191,11 @@ def harvest_source_clear(context,data_dict): return {'id': harvest_source_id} -def harvest_source_index_clear(context,data_dict): - check_access('harvest_source_clear',context,data_dict) - harvest_source_id = data_dict.get('id',None) +def harvest_source_index_clear(context, data_dict): + + check_access('harvest_source_clear', context, data_dict) + harvest_source_id = data_dict.get('id', None) source = HarvestSource.get(harvest_source_id) if not source: @@ -194,8 +205,8 @@ def harvest_source_index_clear(context,data_dict): harvest_source_id = source.id conn = make_connection() - query = ''' +%s:"%s" +site_id:"%s" ''' % ('harvest_source_id', harvest_source_id, - config.get('ckan.site_id')) + query = ''' +%s:"%s" +site_id:"%s" ''' % ( + 'harvest_source_id', harvest_source_id, config.get('ckan.site_id')) try: conn.delete_query(query) if asbool(config.get('ckan.search.solr_commit', 'true')): @@ -208,7 +219,8 @@ def harvest_source_index_clear(context,data_dict): return {'id': harvest_source_id} -def harvest_objects_import(context,data_dict): + +def harvest_objects_import(context, data_dict): ''' Reimports the current harvest objects It performs the import stage with the last fetched objects, optionally @@ -217,18 +229,19 @@ def harvest_objects_import(context,data_dict): It will only affect the last fetched objects already present in the database. ''' + log.info('Harvest objects import: %r', data_dict) - check_access('harvest_objects_import',context,data_dict) + check_access('harvest_objects_import', context, data_dict) model = context['model'] session = context['session'] - source_id = data_dict.get('source_id',None) - harvest_object_id = data_dict.get('harvest_object_id',None) - package_id_or_name = data_dict.get('package_id',None) + source_id = data_dict.get('source_id', None) + harvest_object_id = data_dict.get('harvest_object_id', None) + package_id_or_name = data_dict.get('package_id', None) - segments = context.get('segments',None) + segments = context.get('segments', None) - join_datasets = context.get('join_datasets',True) + join_datasets = context.get('join_datasets', rue) if source_id: source = HarvestSource.get(source_id) @@ -241,42 +254,43 @@ def harvest_objects_import(context,data_dict): raise Exception('This harvest source is not active') last_objects_ids = session.query(HarvestObject.id) \ - .join(HarvestSource) \ - .filter(HarvestObject.source==source) \ - .filter(HarvestObject.current==True) + .join(HarvestSource) \ + .filter(HarvestObject.source == source) \ + .filter(HarvestObject.current == True) elif harvest_object_id: last_objects_ids = session.query(HarvestObject.id) \ - .filter(HarvestObject.id==harvest_object_id) + .filter(HarvestObject.id == harvest_object_id) elif package_id_or_name: last_objects_ids = session.query(HarvestObject.id) \ .join(Package) \ - .filter(HarvestObject.current==True) \ - .filter(Package.state==u'active') \ - .filter(or_(Package.id==package_id_or_name, - Package.name==package_id_or_name)) + .filter(HarvestObject.current == True) \ + .filter(Package.state == u'active') \ + .filter(or_(Package.id == package_id_or_name, + Package.name == package_id_or_name)) join_datasets = False else: last_objects_ids = session.query(HarvestObject.id) \ - .filter(HarvestObject.current==True) + .filter(HarvestObject.current == True) if join_datasets: last_objects_ids = last_objects_ids.join(Package) \ - .filter(Package.state==u'active') + .filter(Package.state == u'active') last_objects_ids = last_objects_ids.all() last_objects_count = 0 for obj_id in last_objects_ids: - if segments and str(hashlib.md5(obj_id[0]).hexdigest())[0] not in segments: + if segments and \ + str(hashlib.md5(obj_id[0]).hexdigest())[0] not in segments: continue obj = session.query(HarvestObject).get(obj_id) for harvester in PluginImplementations(IHarvester): if harvester.info()['name'] == obj.source.type: - if hasattr(harvester,'force_import'): + if hasattr(harvester, 'force_import'): harvester.force_import = True harvester.import_stage(obj) break @@ -284,6 +298,7 @@ def harvest_objects_import(context,data_dict): log.info('Harvest objects imported: %s', last_objects_count) return last_objects_count + def _caluclate_next_run(frequency): now = datetime.datetime.utcnow() @@ -296,7 +311,7 @@ def _caluclate_next_run(frequency): if frequency == 'DAILY': return now + datetime.timedelta(days=1) if frequency == 'MONTHLY': - if now.month in (4,6,9,11): + if now.month in (4, 6, 9, 11): days = 30 elif now.month == 2: if now.year % 4 == 0: @@ -325,13 +340,14 @@ def _make_scheduled_jobs(context, data_dict): source.next_run = _caluclate_next_run(source.frequency) source.save() -def harvest_jobs_run(context,data_dict): + +def harvest_jobs_run(context, data_dict): log.info('Harvest job run: %r', data_dict) - check_access('harvest_jobs_run',context,data_dict) + check_access('harvest_jobs_run', context, data_dict) session = context['session'] - source_id = data_dict.get('source_id',None) + source_id = data_dict.get('source_id', None) if not source_id: _make_scheduled_jobs(context, data_dict) @@ -339,14 +355,15 @@ def harvest_jobs_run(context,data_dict): context['return_objects'] = False # Flag finished jobs as such - jobs = harvest_job_list(context,{'source_id':source_id,'status':u'Running'}) + jobs = harvest_job_list( + context, {'source_id': source_id, 'status': u'Running'}) if len(jobs): for job in jobs: if job['gather_finished']: objects = session.query(HarvestObject.id) \ - .filter(HarvestObject.harvest_job_id==job['id']) \ - .filter(and_((HarvestObject.state!=u'COMPLETE'), - (HarvestObject.state!=u'ERROR'))) \ + .filter(HarvestObject.harvest_job_id == job['id']) \ + .filter(and_((HarvestObject.state != u'COMPLETE'), + (HarvestObject.state != u'ERROR'))) \ .order_by(HarvestObject.import_finished.desc()) if objects.count() == 0: @@ -354,23 +371,24 @@ def harvest_jobs_run(context,data_dict): job_obj.status = u'Finished' last_object = session.query(HarvestObject) \ - .filter(HarvestObject.harvest_job_id==job['id']) \ - .filter(HarvestObject.import_finished!=None) \ - .order_by(HarvestObject.import_finished.desc()) \ - .first() + .filter(HarvestObject.harvest_job_id == job['id']) \ + .filter(HarvestObject.import_finished != None) \ + .order_by(HarvestObject.import_finished.desc()) \ + .first() if last_object: job_obj.finished = last_object.import_finished job_obj.save() # Reindex the harvest source dataset so it has the latest # status - get_action('harvest_source_reindex')(context, - {'id': job_obj.source.id}) + get_action('harvest_source_reindex')( + context, {'id': job_obj.source.id}) # resubmit old redis tasks resubmit_jobs() # Check if there are pending harvest jobs - jobs = harvest_job_list(context,{'source_id':source_id,'status':u'New'}) + jobs = harvest_job_list( + context, {'source_id': source_id, 'status': u'New'}) if len(jobs) == 0: log.info('No new harvest jobs.') raise NoNewHarvestJobError('There are no new harvesting jobs') @@ -380,7 +398,7 @@ def harvest_jobs_run(context,data_dict): sent_jobs = [] for job in jobs: context['detailed'] = False - source = harvest_source_show(context,{'id':job['source_id']}) + source = harvest_source_show(context, {'id': job['source_id']}) if source['active']: job_obj = HarvestJob.get(job['id']) job_obj.status = job['status'] = u'Running' @@ -404,23 +422,25 @@ def harvest_sources_reindex(context, data_dict): model = context['model'] packages = model.Session.query(model.Package) \ - .filter(model.Package.type==DATASET_TYPE_NAME) \ - .filter(model.Package.state==u'active') \ + .filter(model.Package.type == DATASET_TYPE_NAME) \ + .filter(model.Package.state == u'active') \ .all() package_index = PackageSearchIndex() reindex_context = {'defer_commit': True} for package in packages: - get_action('harvest_source_reindex')(reindex_context, {'id': package.id}) + get_action('harvest_source_reindex')( + reindex_context, {'id': package.id}) package_index.commit() return True + @logic.side_effect_free def harvest_source_reindex(context, data_dict): - '''Reindex a single harvest source''' + """Reindex a single harvest source.""" harvest_source_id = logic.get_or_bust(data_dict, 'id') defer_commit = context.get('defer_commit', False) @@ -428,18 +448,23 @@ def harvest_source_reindex(context, data_dict): if 'extras_as_string'in context: del context['extras_as_string'] context.update({'ignore_auth': True}) - package_dict = logic.get_action('harvest_source_show')(context, - {'id': harvest_source_id}) - log.debug('Updating search index for harvest source {0}'.format(harvest_source_id)) + package_dict = logic.get_action('harvest_source_show')( + context, {'id': harvest_source_id}) + log.debug('Updating search index for harvest source {0}'.format( + harvest_source_id)) # Remove configuration values new_dict = {} - if package_dict.get('config'): + if package_dict.get('config', None): config = json.loads(package_dict['config']) for key, value in package_dict.iteritems(): - if key not in config: - new_dict[key] = value + if value: + if value and key not in config: + new_dict[key] = value + package_index = PackageSearchIndex() - package_index.index_package(new_dict, defer_commit=defer_commit) + package_index.index_package( + new_dict, + defer_commit=defer_commit) return True