PEP8 based on #174

Conflicts:
	ckanext/harvest/logic/action/delete.py
	ckanext/harvest/logic/action/update.py
	ckanext/harvest/logic/validators.py
This commit is contained in:
David Read 2015-11-03 20:30:11 +00:00
parent 208d1c4185
commit 10685badb5
4 changed files with 125 additions and 107 deletions

View File

@ -148,15 +148,15 @@ def _check_for_existing_jobs(context, source_id):
return exist return exist
def harvest_object_create(context, data_dict): def harvest_object_create(context, data_dict):
""" Create a new harvest object ''' Create a new harvest object
:type guid: string (optional) :type guid: string (optional)
:type content: string (optional) :type content: string (optional)
:type job_id: string :type job_id: string
:type source_id: string (optional) :type source_id: string (optional)
:type package_id: string (optional) :type package_id: string (optional)
:type extras: dict (optional) :type extras: dict (optional)
""" '''
check_access('harvest_object_create', context, data_dict) check_access('harvest_object_create', context, data_dict)
data, errors = _validate(data_dict, harvest_object_create_schema(), context) data, errors = _validate(data_dict, harvest_object_create_schema(), context)

View File

@ -2,7 +2,6 @@ import logging
from ckan import plugins as p from ckan import plugins as p
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@ -23,5 +22,6 @@ def harvest_source_delete(context, data_dict):
if context.get('clear_source', False): if context.get('clear_source', False):
# We need the id, the name won't work. # We need the id, the name won't work.
package_dict = p.toolkit.get_action('package_show')(context, data_dict) package_dict = p.toolkit.get_action('package_show')(context, data_dict)
p.toolkit.get_action('harvest_source_clear')( p.toolkit.get_action('harvest_source_clear')(
context, {'id': package_dict['id']}) context, {'id': package_dict['id']})

View File

@ -29,9 +29,8 @@ from ckanext.harvest.model import HarvestSource, HarvestJob, HarvestObject
from ckanext.harvest.logic import HarvestJobExists, NoNewHarvestJobError from ckanext.harvest.logic import HarvestJobExists, NoNewHarvestJobError
from ckanext.harvest.logic.dictization import harvest_job_dictize from ckanext.harvest.logic.dictization import harvest_job_dictize
from ckanext.harvest.logic.action.get import harvest_source_show, \ from ckanext.harvest.logic.action.get import (
harvest_job_list, _get_sources_for_user harvest_source_show, harvest_job_list, _get_sources_for_user)
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@ -71,10 +70,8 @@ def harvest_source_update(context, data_dict):
type. Should be a serialized as JSON. (optional) type. Should be a serialized as JSON. (optional)
:type config: string :type config: string
:returns: the newly created harvest source :returns: the newly created harvest source
:rtype: dictionary :rtype: dictionary
''' '''
log.info('Updating harvest source: %r', data_dict) log.info('Updating harvest source: %r', data_dict)
@ -112,11 +109,11 @@ def harvest_source_clear(context, data_dict):
model = context['model'] model = context['model']
sql = "select id from related where id in (select related_id from " \ sql = '''select id from related where id in (
"related_dataset where dataset_id in (select package_id from " \ select related_id from related_dataset where dataset_id in (
"harvest_object where harvest_source_id = " \ select package_id from harvest_object
"'{harvest_source_id}'));".format( where harvest_source_id = '{harvest_source_id}'));'''.format(
harvest_source_id=harvest_source_id) harvest_source_id=harvest_source_id)
result = model.Session.execute(sql) result = model.Session.execute(sql)
ids = [] ids = []
for row in result: for row in result:
@ -124,60 +121,84 @@ def harvest_source_clear(context, data_dict):
related_ids = "('" + "','".join(ids) + "')" related_ids = "('" + "','".join(ids) + "')"
sql = '''begin; sql = '''begin;
update package set state = 'to_delete' where id in (select package_id from harvest_object where harvest_source_id = '{harvest_source_id}');'''.format( update package set state = 'to_delete' where id in (
select package_id from harvest_object
where harvest_source_id = '{harvest_source_id}');'''.format(
harvest_source_id=harvest_source_id) harvest_source_id=harvest_source_id)
# CKAN-2.3 or above: delete resource views, resource revisions & resources # CKAN-2.3 or above: delete resource views, resource revisions & resources
if toolkit.check_ckan_version(min_version='2.3'): if toolkit.check_ckan_version(min_version='2.3'):
sql += ''' sql += '''
delete from resource_view where resource_id in (select id from resource where package_id in (select id from package where state = 'to_delete' )); delete from resource_view where resource_id in (
delete from resource_revision where package_id in (select id from package where state = 'to_delete' ); select id from resource where package_id in (
delete from resource where package_id in (select id from package where state = 'to_delete' ); select id from package where state = 'to_delete'));
delete from resource_revision where package_id in (
select id from package where state = 'to_delete');
delete from resource where package_id in (
select id from package where state = 'to_delete');
''' '''
# Backwards-compatibility: support ResourceGroup (pre-CKAN-2.3) # Backwards-compatibility: support ResourceGroup (pre-CKAN-2.3)
else: else:
sql += ''' sql += '''
delete from resource_revision where resource_group_id in delete from resource_revision where resource_group_id in (
(select id from resource_group where package_id in select id from resource_group where package_id in (
(select id from package where state = 'to_delete')); select id from package where state = 'to_delete'));
delete from resource where resource_group_id in delete from resource where resource_group_id in (
(select id from resource_group where package_id in select id from resource_group where package_id in (
(select id from package where state = 'to_delete')); select id from package where state = 'to_delete'));
delete from resource_group_revision where package_id in delete from resource_group_revision where package_id in (
(select id from package where state = 'to_delete'); select id from package where state = 'to_delete');
delete from resource_group where package_id in delete from resource_group where package_id in (
(select id from package where state = 'to_delete'); select id from package where state = 'to_delete');
''' '''
# CKAN pre-2.5: authz models were removed in migration 078 # CKAN pre-2.5: authz models were removed in migration 078
if toolkit.check_ckan_version(max_version='2.4.99'): if toolkit.check_ckan_version(max_version='2.4.99'):
sql += ''' sql += '''
delete from package_role where package_id in delete from package_role where package_id in (
(select id from package where state = 'to_delete'); select id from package where state = 'to_delete');
delete from user_object_role where id not in delete from user_object_role where id not in (
(select user_object_role_id from package_role) and context = 'Package'; select user_object_role_id from package_role)
and context = 'Package';
''' '''
sql += ''' sql += '''
delete from harvest_object_error where harvest_object_id in (select id from harvest_object where harvest_source_id = '{harvest_source_id}'); delete from harvest_object_error where harvest_object_id in (
delete from harvest_object_extra where harvest_object_id in (select id from harvest_object where harvest_source_id = '{harvest_source_id}'); select id from harvest_object
where harvest_source_id = '{harvest_source_id}');
delete from harvest_object_extra where harvest_object_id in (
select id from harvest_object
where harvest_source_id = '{harvest_source_id}');
delete from harvest_object where harvest_source_id = '{harvest_source_id}'; delete from harvest_object where harvest_source_id = '{harvest_source_id}';
delete from harvest_gather_error where harvest_job_id in (select id from harvest_job where source_id = '{harvest_source_id}'); delete from harvest_gather_error where harvest_job_id in (
select id from harvest_job where source_id = '{harvest_source_id}');
delete from harvest_job where source_id = '{harvest_source_id}'; delete from harvest_job where source_id = '{harvest_source_id}';
delete from package_tag_revision where package_id in (select id from package where state = 'to_delete'); delete from package_tag_revision where package_id in (
delete from member_revision where table_id in (select id from package where state = 'to_delete'); select id from package where state = 'to_delete');
delete from package_extra_revision where package_id in (select id from package where state = 'to_delete'); delete from member_revision where table_id in (
delete from package_revision where id in (select id from package where state = 'to_delete'); select id from package where state = 'to_delete');
delete from package_tag where package_id in (select id from package where state = 'to_delete'); delete from package_extra_revision where package_id in (
delete from package_extra where package_id in (select id from package where state = 'to_delete'); select id from package where state = 'to_delete');
delete from package_relationship_revision where subject_package_id in (select id from package where state = 'to_delete'); delete from package_revision where id in (
delete from package_relationship_revision where object_package_id in (select id from package where state = 'to_delete'); select id from package where state = 'to_delete');
delete from package_relationship where subject_package_id in (select id from package where state = 'to_delete'); delete from package_tag where package_id in (
delete from package_relationship where object_package_id in (select id from package where state = 'to_delete'); select id from package where state = 'to_delete');
delete from member where table_id in (select id from package where state = 'to_delete'); delete from package_extra where package_id in (
delete from related_dataset where dataset_id in (select id from package where state = 'to_delete'); select id from package where state = 'to_delete');
delete from package_relationship_revision where subject_package_id in (
select id from package where state = 'to_delete');
delete from package_relationship_revision where object_package_id in (
select id from package where state = 'to_delete');
delete from package_relationship where subject_package_id in (
select id from package where state = 'to_delete');
delete from package_relationship where object_package_id in (
select id from package where state = 'to_delete');
delete from member where table_id in (
select id from package where state = 'to_delete');
delete from related_dataset where dataset_id in (
select id from package where state = 'to_delete');
delete from related where id in {related_ids}; delete from related where id in {related_ids};
delete from package where id in (select id from package where state = 'to_delete'); delete from package where id in (
select id from package where state = 'to_delete');
commit; commit;
'''.format( '''.format(
harvest_source_id=harvest_source_id, related_ids=related_ids) harvest_source_id=harvest_source_id, related_ids=related_ids)
@ -190,7 +211,7 @@ def harvest_source_clear(context, data_dict):
return {'id': harvest_source_id} return {'id': harvest_source_id}
def harvest_source_index_clear(context,data_dict): def harvest_source_index_clear(context, data_dict):
''' '''
Clears all datasets, jobs and objects related to a harvest source, but Clears all datasets, jobs and objects related to a harvest source, but
keeps the source itself. This is useful to clean history of long running keeps the source itself. This is useful to clean history of long running
@ -199,8 +220,8 @@ def harvest_source_index_clear(context,data_dict):
:type id: string :type id: string
''' '''
check_access('harvest_source_clear',context,data_dict) check_access('harvest_source_clear', context, data_dict)
harvest_source_id = data_dict.get('id',None) harvest_source_id = data_dict.get('id')
source = HarvestSource.get(harvest_source_id) source = HarvestSource.get(harvest_source_id)
if not source: if not source:
@ -210,8 +231,8 @@ def harvest_source_index_clear(context,data_dict):
harvest_source_id = source.id harvest_source_id = source.id
conn = make_connection() conn = make_connection()
query = ''' +%s:"%s" +site_id:"%s" ''' % ('harvest_source_id', harvest_source_id, query = ''' +%s:"%s" +site_id:"%s" ''' % (
config.get('ckan.site_id')) 'harvest_source_id', harvest_source_id, config.get('ckan.site_id'))
try: try:
conn.delete_query(query) conn.delete_query(query)
if asbool(config.get('ckan.search.solr_commit', 'true')): if asbool(config.get('ckan.search.solr_commit', 'true')):
@ -279,7 +300,7 @@ def harvest_objects_import(context, data_dict):
model = context['model'] model = context['model']
session = context['session'] session = context['session']
source_id = data_dict.get('source_id',) source_id = data_dict.get('source_id')
harvest_object_id = data_dict.get('harvest_object_id') harvest_object_id = data_dict.get('harvest_object_id')
package_id_or_name = data_dict.get('package_id') package_id_or_name = data_dict.get('package_id')
@ -297,25 +318,29 @@ def harvest_objects_import(context, data_dict):
log.warn('Harvest source %s is not active.', source_id) log.warn('Harvest source %s is not active.', source_id)
raise Exception('This harvest source is not active') raise Exception('This harvest source is not active')
last_objects_ids = session.query(HarvestObject.id) \ last_objects_ids = \
.join(HarvestSource) \ session.query(HarvestObject.id) \
.filter(HarvestObject.source == source) \ .join(HarvestSource) \
.filter(HarvestObject.current == True) .filter(HarvestObject.source == source) \
.filter(HarvestObject.current == True)
elif harvest_object_id: elif harvest_object_id:
last_objects_ids = session.query(HarvestObject.id) \ last_objects_ids = \
.filter(HarvestObject.id == harvest_object_id) session.query(HarvestObject.id) \
.filter(HarvestObject.id == harvest_object_id)
elif package_id_or_name: elif package_id_or_name:
last_objects_ids = session.query(HarvestObject.id) \ last_objects_ids = \
.join(Package) \ session.query(HarvestObject.id) \
.filter(HarvestObject.current == True) \ .join(Package) \
.filter(Package.state == u'active') \ .filter(HarvestObject.current == True) \
.filter(or_(Package.id == package_id_or_name, .filter(Package.state == u'active') \
Package.name == package_id_or_name)) .filter(or_(Package.id == package_id_or_name,
Package.name == package_id_or_name))
join_datasets = False join_datasets = False
else: else:
last_objects_ids = session.query(HarvestObject.id) \ last_objects_ids = \
.filter(HarvestObject.current == True) session.query(HarvestObject.id) \
.filter(HarvestObject.current == True)
if join_datasets: if join_datasets:
last_objects_ids = last_objects_ids.join(Package) \ last_objects_ids = last_objects_ids.join(Package) \
@ -327,7 +352,7 @@ def harvest_objects_import(context, data_dict):
for obj_id in last_objects_ids: for obj_id in last_objects_ids:
if segments and \ if segments and \
str(hashlib.md5(obj_id[0]).hexdigest())[0] not in segments: str(hashlib.md5(obj_id[0]).hexdigest())[0] not in segments:
continue continue
obj = session.query(HarvestObject).get(obj_id) obj = session.query(HarvestObject).get(obj_id)
@ -343,7 +368,7 @@ def harvest_objects_import(context, data_dict):
return last_objects_count return last_objects_count
def _caluclate_next_run(frequency): def _calculate_next_run(frequency):
now = datetime.datetime.utcnow() now = datetime.datetime.utcnow()
if frequency == 'ALWAYS': if frequency == 'ALWAYS':
@ -378,10 +403,10 @@ def _make_scheduled_jobs(context, data_dict):
data_dict = {'source_id': source.id} data_dict = {'source_id': source.id}
try: try:
get_action('harvest_job_create')(context, data_dict) get_action('harvest_job_create')(context, data_dict)
except HarvestJobExists, e: except HarvestJobExists:
log.info('Trying to rerun job for %s skipping' % source.id) log.info('Trying to rerun job for %s skipping' % source.id)
source.next_run = _caluclate_next_run(source.frequency) source.next_run = _calculate_next_run(source.frequency)
source.save() source.save()
@ -404,11 +429,12 @@ def harvest_jobs_run(context, data_dict):
if len(jobs): if len(jobs):
for job in jobs: for job in jobs:
if job['gather_finished']: if job['gather_finished']:
objects = session.query(HarvestObject.id) \ objects = \
.filter(HarvestObject.harvest_job_id == job['id']) \ session.query(HarvestObject.id) \
.filter(and_((HarvestObject.state != u'COMPLETE'), .filter(HarvestObject.harvest_job_id == job['id']) \
(HarvestObject.state != u'ERROR'))) \ .filter(and_((HarvestObject.state != u'COMPLETE'),
.order_by(HarvestObject.import_finished.desc()) (HarvestObject.state != u'ERROR'))) \
.order_by(HarvestObject.import_finished.desc())
if objects.count() == 0: if objects.count() == 0:
job_obj = HarvestJob.get(job['id']) job_obj = HarvestJob.get(job['id'])
@ -471,7 +497,7 @@ def harvest_job_abort(context, data_dict):
model = context['model'] model = context['model']
source_id = data_dict.get('source_id', None) source_id = data_dict.get('source_id')
source = harvest_source_show(context, {'id': source_id}) source = harvest_source_show(context, {'id': source_id})
# HarvestJob set status to 'Finished' # HarvestJob set status to 'Finished'
@ -550,8 +576,8 @@ def harvest_source_reindex(context, data_dict):
if 'extras_as_string'in context: if 'extras_as_string'in context:
del context['extras_as_string'] del context['extras_as_string']
context.update({'ignore_auth': True}) context.update({'ignore_auth': True})
package_dict = logic.get_action('harvest_source_show')(context, package_dict = logic.get_action('harvest_source_show')(
{'id': harvest_source_id}) context, {'id': harvest_source_id})
log.debug('Updating search index for harvest source: {0}'.format( log.debug('Updating search index for harvest source: {0}'.format(
package_dict.get('name') or harvest_source_id)) package_dict.get('name') or harvest_source_id))

View File

@ -17,7 +17,7 @@ log = logging.getLogger(__name__)
def harvest_source_id_exists(value, context): def harvest_source_id_exists(value, context):
result = HarvestSource.get(value, None) result = HarvestSource.get(value)
if not result: if not result:
raise Invalid('Harvest Source with id %r does not exist.' % str(value)) raise Invalid('Harvest Source with id %r does not exist.' % str(value))
@ -26,8 +26,7 @@ def harvest_source_id_exists(value, context):
def harvest_job_exists(value, context): def harvest_job_exists(value, context):
'''Check if a harvest job exists and returns the model if it does''' '''Check if a harvest job exists and returns the model if it does'''
result = HarvestJob.get(value)
result = HarvestJob.get(value, None)
if not result: if not result:
raise Invalid('Harvest Job with id %r does not exist.' % str(value)) raise Invalid('Harvest Job with id %r does not exist.' % str(value))
@ -52,10 +51,10 @@ def _normalize_url(url):
path = o.path.rstrip('/') path = o.path.rstrip('/')
check_url = urlparse.urlunparse(( check_url = urlparse.urlunparse((
o.scheme, o.scheme,
netloc, netloc,
path, path,
None, None, None)) None, None, None))
return check_url return check_url
@ -80,12 +79,11 @@ def harvest_source_url_validator(key, data, errors, context):
new_url = _normalize_url(data[key]) new_url = _normalize_url(data[key])
q = model.Session.query( q = model.Session.query(model.Package.id, model.Package.url) \
model.Package.id, model.Package.url) \ .filter(model.Package.type == DATASET_TYPE_NAME)
.filter(model.Package.type == DATASET_TYPE_NAME)
if package_id: if package_id:
# When editing a source we need to avoid its own URL. # When editing a source we need to avoid its own URL
q = q.filter(model.Package.id != package_id) q = q.filter(model.Package.id != package_id)
existing_sources = q.all() existing_sources = q.all()
@ -102,10 +100,8 @@ def harvest_source_url_validator(key, data, errors, context):
if url == new_url and conf == new_config: if url == new_url and conf == new_config:
# You can have a duplicate URL if it's pointing to a unique # You can have a duplicate URL if it's pointing to a unique
# set as it will be harvesting unique datasets. # set as it will be harvesting unique datasets.
raise Invalid( raise Invalid('There already is a Harvest Source for this URL: %s'
'There already is a Harvest Source for this URL: %s' % data[key])
% data[key]
)
return data[key] return data[key]
@ -118,18 +114,14 @@ def harvest_source_type_exists(value, context):
for harvester in PluginImplementations(IHarvester): for harvester in PluginImplementations(IHarvester):
info = harvester.info() info = harvester.info()
if not info or 'name' not in info: if not info or 'name' not in info:
log.error( log.error('Harvester %s does not provide the harvester name in '
'Harvester %r does not provide the harvester name in the info ' 'the info response' % harvester)
'response' % str(harvester)
)
continue continue
available_types.append(info['name']) available_types.append(info['name'])
if value not in available_types: if not value in available_types:
raise Invalid( raise Invalid('Unknown harvester type: %s. Have you registered a '
'Unknown harvester type: %s. Have you registered a harvester for ' 'harvester for this type?' % value)
'this type?' % value
)
return value return value
@ -143,8 +135,8 @@ def harvest_source_config_validator(key, data, errors, context):
try: try:
return harvester.validate_config(data[key]) return harvester.validate_config(data[key])
except Exception, e: except Exception, e:
raise Invalid( raise Invalid('Error parsing the configuration options: %s'
'Error parsing the configuration options: %s' % str(e)) % e)
else: else:
return data[key] return data[key]