diff --git a/ckanext/harvest/logic/action/create.py b/ckanext/harvest/logic/action/create.py index 203b9eb..d904c5c 100644 --- a/ckanext/harvest/logic/action/create.py +++ b/ckanext/harvest/logic/action/create.py @@ -148,15 +148,15 @@ def _check_for_existing_jobs(context, source_id): return exist def harvest_object_create(context, data_dict): - """ Create a new harvest object + ''' Create a new harvest object :type guid: string (optional) :type content: string (optional) - :type job_id: string + :type job_id: string :type source_id: string (optional) :type package_id: string (optional) :type extras: dict (optional) - """ + ''' check_access('harvest_object_create', context, data_dict) data, errors = _validate(data_dict, harvest_object_create_schema(), context) diff --git a/ckanext/harvest/logic/action/delete.py b/ckanext/harvest/logic/action/delete.py index 88ae8b9..17751a4 100644 --- a/ckanext/harvest/logic/action/delete.py +++ b/ckanext/harvest/logic/action/delete.py @@ -2,7 +2,6 @@ import logging from ckan import plugins as p - log = logging.getLogger(__name__) @@ -23,5 +22,6 @@ def harvest_source_delete(context, data_dict): if context.get('clear_source', False): # We need the id, the name won't work. package_dict = p.toolkit.get_action('package_show')(context, data_dict) + p.toolkit.get_action('harvest_source_clear')( context, {'id': package_dict['id']}) diff --git a/ckanext/harvest/logic/action/update.py b/ckanext/harvest/logic/action/update.py index 8280419..4649112 100644 --- a/ckanext/harvest/logic/action/update.py +++ b/ckanext/harvest/logic/action/update.py @@ -29,9 +29,8 @@ from ckanext.harvest.model import HarvestSource, HarvestJob, HarvestObject from ckanext.harvest.logic import HarvestJobExists, NoNewHarvestJobError from ckanext.harvest.logic.dictization import harvest_job_dictize -from ckanext.harvest.logic.action.get import harvest_source_show, \ - harvest_job_list, _get_sources_for_user - +from ckanext.harvest.logic.action.get import ( + harvest_source_show, harvest_job_list, _get_sources_for_user) log = logging.getLogger(__name__) @@ -71,10 +70,8 @@ def harvest_source_update(context, data_dict): type. Should be a serialized as JSON. (optional) :type config: string - :returns: the newly created harvest source :rtype: dictionary - ''' log.info('Updating harvest source: %r', data_dict) @@ -112,11 +109,11 @@ def harvest_source_clear(context, data_dict): model = context['model'] - sql = "select id from related where id in (select related_id from " \ - "related_dataset where dataset_id in (select package_id from " \ - "harvest_object where harvest_source_id = " \ - "'{harvest_source_id}'));".format( - harvest_source_id=harvest_source_id) + sql = '''select id from related where id in ( + select related_id from related_dataset where dataset_id in ( + select package_id from harvest_object + where harvest_source_id = '{harvest_source_id}'));'''.format( + harvest_source_id=harvest_source_id) result = model.Session.execute(sql) ids = [] for row in result: @@ -124,60 +121,84 @@ def harvest_source_clear(context, data_dict): related_ids = "('" + "','".join(ids) + "')" sql = '''begin; - update package set state = 'to_delete' where id in (select package_id from harvest_object where harvest_source_id = '{harvest_source_id}');'''.format( + update package set state = 'to_delete' where id in ( + select package_id from harvest_object + where harvest_source_id = '{harvest_source_id}');'''.format( harvest_source_id=harvest_source_id) # CKAN-2.3 or above: delete resource views, resource revisions & resources if toolkit.check_ckan_version(min_version='2.3'): sql += ''' - delete from resource_view where resource_id in (select id from resource where package_id in (select id from package where state = 'to_delete' )); - delete from resource_revision where package_id in (select id from package where state = 'to_delete' ); - delete from resource where package_id in (select id from package where state = 'to_delete' ); + delete from resource_view where resource_id in ( + select id from resource where package_id in ( + select id from package where state = 'to_delete')); + delete from resource_revision where package_id in ( + select id from package where state = 'to_delete'); + delete from resource where package_id in ( + select id from package where state = 'to_delete'); ''' # Backwards-compatibility: support ResourceGroup (pre-CKAN-2.3) else: sql += ''' - delete from resource_revision where resource_group_id in - (select id from resource_group where package_id in - (select id from package where state = 'to_delete')); - delete from resource where resource_group_id in - (select id from resource_group where package_id in - (select id from package where state = 'to_delete')); - delete from resource_group_revision where package_id in - (select id from package where state = 'to_delete'); - delete from resource_group where package_id in - (select id from package where state = 'to_delete'); + delete from resource_revision where resource_group_id in ( + select id from resource_group where package_id in ( + select id from package where state = 'to_delete')); + delete from resource where resource_group_id in ( + select id from resource_group where package_id in ( + select id from package where state = 'to_delete')); + delete from resource_group_revision where package_id in ( + select id from package where state = 'to_delete'); + delete from resource_group where package_id in ( + select id from package where state = 'to_delete'); ''' # CKAN pre-2.5: authz models were removed in migration 078 if toolkit.check_ckan_version(max_version='2.4.99'): sql += ''' - delete from package_role where package_id in - (select id from package where state = 'to_delete'); - delete from user_object_role where id not in - (select user_object_role_id from package_role) and context = 'Package'; + delete from package_role where package_id in ( + select id from package where state = 'to_delete'); + delete from user_object_role where id not in ( + select user_object_role_id from package_role) + and context = 'Package'; ''' - sql += ''' - delete from harvest_object_error where harvest_object_id in (select id from harvest_object where harvest_source_id = '{harvest_source_id}'); - delete from harvest_object_extra where harvest_object_id in (select id from harvest_object where harvest_source_id = '{harvest_source_id}'); + delete from harvest_object_error where harvest_object_id in ( + select id from harvest_object + where harvest_source_id = '{harvest_source_id}'); + delete from harvest_object_extra where harvest_object_id in ( + select id from harvest_object + where harvest_source_id = '{harvest_source_id}'); delete from harvest_object where harvest_source_id = '{harvest_source_id}'; - delete from harvest_gather_error where harvest_job_id in (select id from harvest_job where source_id = '{harvest_source_id}'); + delete from harvest_gather_error where harvest_job_id in ( + select id from harvest_job where source_id = '{harvest_source_id}'); delete from harvest_job where source_id = '{harvest_source_id}'; - delete from package_tag_revision where package_id in (select id from package where state = 'to_delete'); - delete from member_revision where table_id in (select id from package where state = 'to_delete'); - delete from package_extra_revision where package_id in (select id from package where state = 'to_delete'); - delete from package_revision where id in (select id from package where state = 'to_delete'); - delete from package_tag where package_id in (select id from package where state = 'to_delete'); - delete from package_extra where package_id in (select id from package where state = 'to_delete'); - delete from package_relationship_revision where subject_package_id in (select id from package where state = 'to_delete'); - delete from package_relationship_revision where object_package_id in (select id from package where state = 'to_delete'); - delete from package_relationship where subject_package_id in (select id from package where state = 'to_delete'); - delete from package_relationship where object_package_id in (select id from package where state = 'to_delete'); - delete from member where table_id in (select id from package where state = 'to_delete'); - delete from related_dataset where dataset_id in (select id from package where state = 'to_delete'); + delete from package_tag_revision where package_id in ( + select id from package where state = 'to_delete'); + delete from member_revision where table_id in ( + select id from package where state = 'to_delete'); + delete from package_extra_revision where package_id in ( + select id from package where state = 'to_delete'); + delete from package_revision where id in ( + select id from package where state = 'to_delete'); + delete from package_tag where package_id in ( + select id from package where state = 'to_delete'); + delete from package_extra where package_id in ( + select id from package where state = 'to_delete'); + delete from package_relationship_revision where subject_package_id in ( + select id from package where state = 'to_delete'); + delete from package_relationship_revision where object_package_id in ( + select id from package where state = 'to_delete'); + delete from package_relationship where subject_package_id in ( + select id from package where state = 'to_delete'); + delete from package_relationship where object_package_id in ( + select id from package where state = 'to_delete'); + delete from member where table_id in ( + select id from package where state = 'to_delete'); + delete from related_dataset where dataset_id in ( + select id from package where state = 'to_delete'); delete from related where id in {related_ids}; - delete from package where id in (select id from package where state = 'to_delete'); + delete from package where id in ( + select id from package where state = 'to_delete'); commit; '''.format( harvest_source_id=harvest_source_id, related_ids=related_ids) @@ -190,7 +211,7 @@ def harvest_source_clear(context, data_dict): return {'id': harvest_source_id} -def harvest_source_index_clear(context,data_dict): +def harvest_source_index_clear(context, data_dict): ''' Clears all datasets, jobs and objects related to a harvest source, but keeps the source itself. This is useful to clean history of long running @@ -199,8 +220,8 @@ def harvest_source_index_clear(context,data_dict): :type id: string ''' - check_access('harvest_source_clear',context,data_dict) - harvest_source_id = data_dict.get('id',None) + check_access('harvest_source_clear', context, data_dict) + harvest_source_id = data_dict.get('id') source = HarvestSource.get(harvest_source_id) if not source: @@ -210,8 +231,8 @@ def harvest_source_index_clear(context,data_dict): harvest_source_id = source.id conn = make_connection() - query = ''' +%s:"%s" +site_id:"%s" ''' % ('harvest_source_id', harvest_source_id, - config.get('ckan.site_id')) + query = ''' +%s:"%s" +site_id:"%s" ''' % ( + 'harvest_source_id', harvest_source_id, config.get('ckan.site_id')) try: conn.delete_query(query) if asbool(config.get('ckan.search.solr_commit', 'true')): @@ -279,7 +300,7 @@ def harvest_objects_import(context, data_dict): model = context['model'] session = context['session'] - source_id = data_dict.get('source_id',) + source_id = data_dict.get('source_id') harvest_object_id = data_dict.get('harvest_object_id') package_id_or_name = data_dict.get('package_id') @@ -297,25 +318,29 @@ def harvest_objects_import(context, data_dict): log.warn('Harvest source %s is not active.', source_id) raise Exception('This harvest source is not active') - last_objects_ids = session.query(HarvestObject.id) \ - .join(HarvestSource) \ - .filter(HarvestObject.source == source) \ - .filter(HarvestObject.current == True) + last_objects_ids = \ + session.query(HarvestObject.id) \ + .join(HarvestSource) \ + .filter(HarvestObject.source == source) \ + .filter(HarvestObject.current == True) elif harvest_object_id: - last_objects_ids = session.query(HarvestObject.id) \ - .filter(HarvestObject.id == harvest_object_id) + last_objects_ids = \ + session.query(HarvestObject.id) \ + .filter(HarvestObject.id == harvest_object_id) elif package_id_or_name: - last_objects_ids = session.query(HarvestObject.id) \ - .join(Package) \ - .filter(HarvestObject.current == True) \ - .filter(Package.state == u'active') \ - .filter(or_(Package.id == package_id_or_name, - Package.name == package_id_or_name)) + last_objects_ids = \ + session.query(HarvestObject.id) \ + .join(Package) \ + .filter(HarvestObject.current == True) \ + .filter(Package.state == u'active') \ + .filter(or_(Package.id == package_id_or_name, + Package.name == package_id_or_name)) join_datasets = False else: - last_objects_ids = session.query(HarvestObject.id) \ - .filter(HarvestObject.current == True) + last_objects_ids = \ + session.query(HarvestObject.id) \ + .filter(HarvestObject.current == True) if join_datasets: last_objects_ids = last_objects_ids.join(Package) \ @@ -327,7 +352,7 @@ def harvest_objects_import(context, data_dict): for obj_id in last_objects_ids: if segments and \ - str(hashlib.md5(obj_id[0]).hexdigest())[0] not in segments: + str(hashlib.md5(obj_id[0]).hexdigest())[0] not in segments: continue obj = session.query(HarvestObject).get(obj_id) @@ -343,7 +368,7 @@ def harvest_objects_import(context, data_dict): return last_objects_count -def _caluclate_next_run(frequency): +def _calculate_next_run(frequency): now = datetime.datetime.utcnow() if frequency == 'ALWAYS': @@ -378,10 +403,10 @@ def _make_scheduled_jobs(context, data_dict): data_dict = {'source_id': source.id} try: get_action('harvest_job_create')(context, data_dict) - except HarvestJobExists, e: + except HarvestJobExists: log.info('Trying to rerun job for %s skipping' % source.id) - source.next_run = _caluclate_next_run(source.frequency) + source.next_run = _calculate_next_run(source.frequency) source.save() @@ -404,11 +429,12 @@ def harvest_jobs_run(context, data_dict): if len(jobs): for job in jobs: if job['gather_finished']: - objects = session.query(HarvestObject.id) \ - .filter(HarvestObject.harvest_job_id == job['id']) \ - .filter(and_((HarvestObject.state != u'COMPLETE'), - (HarvestObject.state != u'ERROR'))) \ - .order_by(HarvestObject.import_finished.desc()) + objects = \ + session.query(HarvestObject.id) \ + .filter(HarvestObject.harvest_job_id == job['id']) \ + .filter(and_((HarvestObject.state != u'COMPLETE'), + (HarvestObject.state != u'ERROR'))) \ + .order_by(HarvestObject.import_finished.desc()) if objects.count() == 0: job_obj = HarvestJob.get(job['id']) @@ -471,7 +497,7 @@ def harvest_job_abort(context, data_dict): model = context['model'] - source_id = data_dict.get('source_id', None) + source_id = data_dict.get('source_id') source = harvest_source_show(context, {'id': source_id}) # HarvestJob set status to 'Finished' @@ -550,8 +576,8 @@ def harvest_source_reindex(context, data_dict): if 'extras_as_string'in context: del context['extras_as_string'] context.update({'ignore_auth': True}) - package_dict = logic.get_action('harvest_source_show')(context, - {'id': harvest_source_id}) + package_dict = logic.get_action('harvest_source_show')( + context, {'id': harvest_source_id}) log.debug('Updating search index for harvest source: {0}'.format( package_dict.get('name') or harvest_source_id)) diff --git a/ckanext/harvest/logic/validators.py b/ckanext/harvest/logic/validators.py index 01e73ef..bb1ab12 100644 --- a/ckanext/harvest/logic/validators.py +++ b/ckanext/harvest/logic/validators.py @@ -17,7 +17,7 @@ log = logging.getLogger(__name__) def harvest_source_id_exists(value, context): - result = HarvestSource.get(value, None) + result = HarvestSource.get(value) if not result: raise Invalid('Harvest Source with id %r does not exist.' % str(value)) @@ -26,8 +26,7 @@ def harvest_source_id_exists(value, context): def harvest_job_exists(value, context): '''Check if a harvest job exists and returns the model if it does''' - - result = HarvestJob.get(value, None) + result = HarvestJob.get(value) if not result: raise Invalid('Harvest Job with id %r does not exist.' % str(value)) @@ -52,10 +51,10 @@ def _normalize_url(url): path = o.path.rstrip('/') check_url = urlparse.urlunparse(( - o.scheme, - netloc, - path, - None, None, None)) + o.scheme, + netloc, + path, + None, None, None)) return check_url @@ -80,12 +79,11 @@ def harvest_source_url_validator(key, data, errors, context): new_url = _normalize_url(data[key]) - q = model.Session.query( - model.Package.id, model.Package.url) \ - .filter(model.Package.type == DATASET_TYPE_NAME) + q = model.Session.query(model.Package.id, model.Package.url) \ + .filter(model.Package.type == DATASET_TYPE_NAME) if package_id: - # When editing a source we need to avoid its own URL. + # When editing a source we need to avoid its own URL q = q.filter(model.Package.id != package_id) existing_sources = q.all() @@ -102,10 +100,8 @@ def harvest_source_url_validator(key, data, errors, context): if url == new_url and conf == new_config: # You can have a duplicate URL if it's pointing to a unique # set as it will be harvesting unique datasets. - raise Invalid( - 'There already is a Harvest Source for this URL: %s' - % data[key] - ) + raise Invalid('There already is a Harvest Source for this URL: %s' + % data[key]) return data[key] @@ -118,18 +114,14 @@ def harvest_source_type_exists(value, context): for harvester in PluginImplementations(IHarvester): info = harvester.info() if not info or 'name' not in info: - log.error( - 'Harvester %r does not provide the harvester name in the info ' - 'response' % str(harvester) - ) + log.error('Harvester %s does not provide the harvester name in ' + 'the info response' % harvester) continue available_types.append(info['name']) - if value not in available_types: - raise Invalid( - 'Unknown harvester type: %s. Have you registered a harvester for ' - 'this type?' % value - ) + if not value in available_types: + raise Invalid('Unknown harvester type: %s. Have you registered a ' + 'harvester for this type?' % value) return value @@ -143,8 +135,8 @@ def harvest_source_config_validator(key, data, errors, context): try: return harvester.validate_config(data[key]) except Exception, e: - raise Invalid( - 'Error parsing the configuration options: %s' % str(e)) + raise Invalid('Error parsing the configuration options: %s' + % e) else: return data[key]